You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/04/25 09:28:52 UTC

incubator-airflow git commit: [AIRFLOW-1089] Add Spark application arguments

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 831f8d504 -> e5b914789


[AIRFLOW-1089] Add Spark application arguments

Allows arguments to be passed to the Spark
application being
submitted. For example:

- spark-submit --class foo.Bar foobar.jar arg1
arg2
- spark-submit app.py arg1 arg2

Closes #2229 from camshrun/sparkSubmitAppArgs


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/e5b91478
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/e5b91478
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/e5b91478

Branch: refs/heads/master
Commit: e5b9147894b0d47bf36f1c2570d765b16c1c2506
Parents: 831f8d5
Author: Stephan Werges <sw...@accertify.com>
Authored: Tue Apr 25 11:28:31 2017 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Tue Apr 25 11:28:31 2017 +0200

----------------------------------------------------------------------
 airflow/contrib/hooks/spark_submit_hook.py          |  9 +++++++++
 airflow/contrib/operators/spark_submit_operator.py  |  5 +++++
 tests/contrib/hooks/test_spark_submit_hook.py       | 16 +++++++++++++++-
 .../contrib/operators/test_spark_submit_operator.py |  7 ++++++-
 4 files changed, 35 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e5b91478/airflow/contrib/hooks/spark_submit_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/spark_submit_hook.py b/airflow/contrib/hooks/spark_submit_hook.py
index 59d28b5..e4ce797 100644
--- a/airflow/contrib/hooks/spark_submit_hook.py
+++ b/airflow/contrib/hooks/spark_submit_hook.py
@@ -56,6 +56,8 @@ class SparkSubmitHook(BaseHook):
     :type name: str
     :param num_executors: Number of executors to launch
     :type num_executors: int
+    :param application_args: Arguments for the application being submitted
+    :type application_args: list
     :param verbose: Whether to pass the verbose flag to spark-submit process for debugging
     :type verbose: bool
     """
@@ -74,6 +76,7 @@ class SparkSubmitHook(BaseHook):
                  principal=None,
                  name='default-name',
                  num_executors=None,
+                 application_args=None,
                  verbose=False):
         self._conf = conf
         self._conn_id = conn_id
@@ -88,6 +91,7 @@ class SparkSubmitHook(BaseHook):
         self._principal = principal
         self._name = name
         self._num_executors = num_executors
+        self._application_args = application_args
         self._verbose = verbose
         self._sp = None
         self._yarn_application_id = None
@@ -183,6 +187,11 @@ class SparkSubmitHook(BaseHook):
         # The actual script to execute
         connection_cmd += [application]
 
+        # Append any application arguments
+        if self._application_args:
+            for arg in self._application_args:
+                connection_cmd += [arg]
+
         logging.debug("Spark-Submit cmd: {}".format(connection_cmd))
 
         return connection_cmd

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e5b91478/airflow/contrib/operators/spark_submit_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/spark_submit_operator.py b/airflow/contrib/operators/spark_submit_operator.py
index f62c395..2a7e3cf 100644
--- a/airflow/contrib/operators/spark_submit_operator.py
+++ b/airflow/contrib/operators/spark_submit_operator.py
@@ -56,6 +56,8 @@ class SparkSubmitOperator(BaseOperator):
     :type name: str
     :param num_executors: Number of executors to launch
     :type num_executors: int
+    :param application_args: Arguments for the application being submitted
+    :type application_args: list
     :param verbose: Whether to pass the verbose flag to spark-submit process for debugging
     :type verbose: bool
     """
@@ -76,6 +78,7 @@ class SparkSubmitOperator(BaseOperator):
                  principal=None,
                  name='airflow-spark',
                  num_executors=None,
+                 application_args=None,
                  verbose=False,
                  *args,
                  **kwargs):
@@ -93,6 +96,7 @@ class SparkSubmitOperator(BaseOperator):
         self._principal = principal
         self._name = name
         self._num_executors = num_executors
+        self._application_args = application_args
         self._verbose = verbose
         self._hook = None
         self._conn_id = conn_id
@@ -115,6 +119,7 @@ class SparkSubmitOperator(BaseOperator):
             principal=self._principal,
             name=self._name,
             num_executors=self._num_executors,
+            application_args=self._application_args,
             verbose=self._verbose
         )
         self._hook.submit(self._application)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e5b91478/tests/contrib/hooks/test_spark_submit_hook.py
----------------------------------------------------------------------
diff --git a/tests/contrib/hooks/test_spark_submit_hook.py b/tests/contrib/hooks/test_spark_submit_hook.py
index 24315fa..81916ad 100644
--- a/tests/contrib/hooks/test_spark_submit_hook.py
+++ b/tests/contrib/hooks/test_spark_submit_hook.py
@@ -42,7 +42,12 @@ class TestSparkSubmitHook(unittest.TestCase):
         'num_executors': 10,
         'verbose': True,
         'driver_memory': '3g',
-        'java_class': 'com.foo.bar.AppMain'
+        'java_class': 'com.foo.bar.AppMain',
+        'application_args': [
+            '-f foo',
+            '--bar bar',
+            'baz'
+        ]
     }
 
     def setUp(self):
@@ -102,6 +107,15 @@ class TestSparkSubmitHook(unittest.TestCase):
         for k in self._config['conf']:
             assert "--conf {0}={1}".format(k, self._config['conf'][k]) in cmd
 
+        # Check the application arguments are there
+        for a in self._config['application_args']:
+            assert a in cmd
+
+        # Check if application arguments are after the application
+        application_idx = cmd.find(self._spark_job_file)
+        for a in self._config['application_args']:
+            assert cmd.find(a) > application_idx
+
         if self._config['verbose']:
             assert "--verbose" in cmd
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e5b91478/tests/contrib/operators/test_spark_submit_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_spark_submit_operator.py b/tests/contrib/operators/test_spark_submit_operator.py
index 3c11dbb..dd3d84b 100644
--- a/tests/contrib/operators/test_spark_submit_operator.py
+++ b/tests/contrib/operators/test_spark_submit_operator.py
@@ -41,7 +41,11 @@ class TestSparkSubmitOperator(unittest.TestCase):
         'verbose': True,
         'application': 'test_application.py',
         'driver_memory': '3g',
-        'java_class': 'com.foo.bar.AppMain'
+        'java_class': 'com.foo.bar.AppMain',
+        'application_args': [
+            '-f foo',
+            '--bar bar'
+        ]
     }
 
     def setUp(self):
@@ -80,6 +84,7 @@ class TestSparkSubmitOperator(unittest.TestCase):
         self.assertEqual(self._config['verbose'], operator._verbose)
         self.assertEqual(self._config['java_class'], operator._java_class)
         self.assertEqual(self._config['driver_memory'], operator._driver_memory)
+        self.assertEqual(self._config['application_args'], operator._application_args)