You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/04/25 09:28:52 UTC
incubator-airflow git commit: [AIRFLOW-1089] Add Spark application
arguments
Repository: incubator-airflow
Updated Branches:
refs/heads/master 831f8d504 -> e5b914789
[AIRFLOW-1089] Add Spark application arguments
Allows arguments to be passed to the Spark
application being
submitted. For example:
- spark-submit --class foo.Bar foobar.jar arg1
arg2
- spark-submit app.py arg1 arg2
Closes #2229 from camshrun/sparkSubmitAppArgs
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/e5b91478
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/e5b91478
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/e5b91478
Branch: refs/heads/master
Commit: e5b9147894b0d47bf36f1c2570d765b16c1c2506
Parents: 831f8d5
Author: Stephan Werges <sw...@accertify.com>
Authored: Tue Apr 25 11:28:31 2017 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Tue Apr 25 11:28:31 2017 +0200
----------------------------------------------------------------------
airflow/contrib/hooks/spark_submit_hook.py | 9 +++++++++
airflow/contrib/operators/spark_submit_operator.py | 5 +++++
tests/contrib/hooks/test_spark_submit_hook.py | 16 +++++++++++++++-
.../contrib/operators/test_spark_submit_operator.py | 7 ++++++-
4 files changed, 35 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e5b91478/airflow/contrib/hooks/spark_submit_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/spark_submit_hook.py b/airflow/contrib/hooks/spark_submit_hook.py
index 59d28b5..e4ce797 100644
--- a/airflow/contrib/hooks/spark_submit_hook.py
+++ b/airflow/contrib/hooks/spark_submit_hook.py
@@ -56,6 +56,8 @@ class SparkSubmitHook(BaseHook):
:type name: str
:param num_executors: Number of executors to launch
:type num_executors: int
+ :param application_args: Arguments for the application being submitted
+ :type application_args: list
:param verbose: Whether to pass the verbose flag to spark-submit process for debugging
:type verbose: bool
"""
@@ -74,6 +76,7 @@ class SparkSubmitHook(BaseHook):
principal=None,
name='default-name',
num_executors=None,
+ application_args=None,
verbose=False):
self._conf = conf
self._conn_id = conn_id
@@ -88,6 +91,7 @@ class SparkSubmitHook(BaseHook):
self._principal = principal
self._name = name
self._num_executors = num_executors
+ self._application_args = application_args
self._verbose = verbose
self._sp = None
self._yarn_application_id = None
@@ -183,6 +187,11 @@ class SparkSubmitHook(BaseHook):
# The actual script to execute
connection_cmd += [application]
+ # Append any application arguments
+ if self._application_args:
+ for arg in self._application_args:
+ connection_cmd += [arg]
+
logging.debug("Spark-Submit cmd: {}".format(connection_cmd))
return connection_cmd
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e5b91478/airflow/contrib/operators/spark_submit_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/spark_submit_operator.py b/airflow/contrib/operators/spark_submit_operator.py
index f62c395..2a7e3cf 100644
--- a/airflow/contrib/operators/spark_submit_operator.py
+++ b/airflow/contrib/operators/spark_submit_operator.py
@@ -56,6 +56,8 @@ class SparkSubmitOperator(BaseOperator):
:type name: str
:param num_executors: Number of executors to launch
:type num_executors: int
+ :param application_args: Arguments for the application being submitted
+ :type application_args: list
:param verbose: Whether to pass the verbose flag to spark-submit process for debugging
:type verbose: bool
"""
@@ -76,6 +78,7 @@ class SparkSubmitOperator(BaseOperator):
principal=None,
name='airflow-spark',
num_executors=None,
+ application_args=None,
verbose=False,
*args,
**kwargs):
@@ -93,6 +96,7 @@ class SparkSubmitOperator(BaseOperator):
self._principal = principal
self._name = name
self._num_executors = num_executors
+ self._application_args = application_args
self._verbose = verbose
self._hook = None
self._conn_id = conn_id
@@ -115,6 +119,7 @@ class SparkSubmitOperator(BaseOperator):
principal=self._principal,
name=self._name,
num_executors=self._num_executors,
+ application_args=self._application_args,
verbose=self._verbose
)
self._hook.submit(self._application)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e5b91478/tests/contrib/hooks/test_spark_submit_hook.py
----------------------------------------------------------------------
diff --git a/tests/contrib/hooks/test_spark_submit_hook.py b/tests/contrib/hooks/test_spark_submit_hook.py
index 24315fa..81916ad 100644
--- a/tests/contrib/hooks/test_spark_submit_hook.py
+++ b/tests/contrib/hooks/test_spark_submit_hook.py
@@ -42,7 +42,12 @@ class TestSparkSubmitHook(unittest.TestCase):
'num_executors': 10,
'verbose': True,
'driver_memory': '3g',
- 'java_class': 'com.foo.bar.AppMain'
+ 'java_class': 'com.foo.bar.AppMain',
+ 'application_args': [
+ '-f foo',
+ '--bar bar',
+ 'baz'
+ ]
}
def setUp(self):
@@ -102,6 +107,15 @@ class TestSparkSubmitHook(unittest.TestCase):
for k in self._config['conf']:
assert "--conf {0}={1}".format(k, self._config['conf'][k]) in cmd
+ # Check the application arguments are there
+ for a in self._config['application_args']:
+ assert a in cmd
+
+ # Check if application arguments are after the application
+ application_idx = cmd.find(self._spark_job_file)
+ for a in self._config['application_args']:
+ assert cmd.find(a) > application_idx
+
if self._config['verbose']:
assert "--verbose" in cmd
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e5b91478/tests/contrib/operators/test_spark_submit_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_spark_submit_operator.py b/tests/contrib/operators/test_spark_submit_operator.py
index 3c11dbb..dd3d84b 100644
--- a/tests/contrib/operators/test_spark_submit_operator.py
+++ b/tests/contrib/operators/test_spark_submit_operator.py
@@ -41,7 +41,11 @@ class TestSparkSubmitOperator(unittest.TestCase):
'verbose': True,
'application': 'test_application.py',
'driver_memory': '3g',
- 'java_class': 'com.foo.bar.AppMain'
+ 'java_class': 'com.foo.bar.AppMain',
+ 'application_args': [
+ '-f foo',
+ '--bar bar'
+ ]
}
def setUp(self):
@@ -80,6 +84,7 @@ class TestSparkSubmitOperator(unittest.TestCase):
self.assertEqual(self._config['verbose'], operator._verbose)
self.assertEqual(self._config['java_class'], operator._java_class)
self.assertEqual(self._config['driver_memory'], operator._driver_memory)
+ self.assertEqual(self._config['application_args'], operator._application_args)