You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ma...@apache.org on 2017/06/08 15:36:45 UTC

[02/18] incubator-airflow git commit: [AIRFLOW-1160] Update Spark parameters for Mesos

[AIRFLOW-1160] Update Spark parameters for Mesos

Closes #2265 from cameres/master


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b0245e4a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b0245e4a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b0245e4a

Branch: refs/heads/v1-8-test
Commit: b0245e4a3dacf665e26446acafb3e80f558a83e0
Parents: 9744b1a
Author: Connor Ameres <co...@gmail.com>
Authored: Mon May 1 23:22:04 2017 +0200
Committer: Maxime Beauchemin <ma...@gmail.com>
Committed: Thu Jun 8 08:36:20 2017 -0700

----------------------------------------------------------------------
 airflow/contrib/hooks/spark_sql_hook.py            | 8 +++++++-
 airflow/contrib/hooks/spark_submit_hook.py         | 8 +++++++-
 airflow/contrib/operators/spark_sql_operator.py    | 7 ++++++-
 airflow/contrib/operators/spark_submit_operator.py | 7 ++++++-
 tests/contrib/hooks/spark_submit_hook.py           | 2 ++
 tests/contrib/operators/spark_submit_operator.py   | 2 ++
 6 files changed, 30 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b0245e4a/airflow/contrib/hooks/spark_sql_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/spark_sql_hook.py b/airflow/contrib/hooks/spark_sql_hook.py
index 37df3fe..8d73f60 100644
--- a/airflow/contrib/hooks/spark_sql_hook.py
+++ b/airflow/contrib/hooks/spark_sql_hook.py
@@ -31,7 +31,9 @@ class SparkSqlHook(BaseHook):
     :type conf: str (format: PROP=VALUE)
     :param conn_id: connection_id string
     :type conn_id: str
-    :param executor_cores: Number of cores per executor
+    :param total_executor_cores: (Standalone & Mesos only) Total cores for all executors (Default: all the available cores on the worker)
+    :type total_executor_cores: int
+    :param executor_cores: (Standalone & YARN only) Number of cores per executor (Default: 2)
     :type executor_cores: int
     :param executor_memory: Memory per executor (e.g. 1000M, 2G) (Default: 1G)
     :type executor_memory: str
@@ -52,6 +54,7 @@ class SparkSqlHook(BaseHook):
                  sql,
                  conf=None,
                  conn_id='spark_sql_default',
+                 total_executor_cores=None,
                  executor_cores=None,
                  executor_memory=None,
                  keytab=None,
@@ -64,6 +67,7 @@ class SparkSqlHook(BaseHook):
         self._sql = sql
         self._conf = conf
         self._conn = self.get_connection(conn_id)
+        self._total_executor_cores = total_executor_cores
         self._executor_cores = executor_cores
         self._executor_memory = executor_memory
         self._keytab = keytab
@@ -89,6 +93,8 @@ class SparkSqlHook(BaseHook):
         if self._conf:
             for conf_el in self._conf.split(","):
                 connection_cmd += ["--conf", conf_el]
+        if self._total_executor_cores:
+            connection_cmd += ["--total-executor-cores", str(self._total_executor_cores)]
         if self._executor_cores:
             connection_cmd += ["--executor-cores", str(self._executor_cores)]
         if self._executor_memory:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b0245e4a/airflow/contrib/hooks/spark_submit_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/spark_submit_hook.py b/airflow/contrib/hooks/spark_submit_hook.py
index e4ce797..c34538e 100644
--- a/airflow/contrib/hooks/spark_submit_hook.py
+++ b/airflow/contrib/hooks/spark_submit_hook.py
@@ -42,7 +42,9 @@ class SparkSubmitHook(BaseHook):
     :type jars: str
     :param java_class: the main class of the Java application
     :type java_class: str
-    :param executor_cores: Number of cores per executor (Default: 2)
+    :param total_executor_cores: (Standalone & Mesos only) Total cores for all executors (Default: all the available cores on the worker)
+    :type total_executor_cores: int
+    :param executor_cores: (Standalone & YARN only) Number of cores per executor (Default: 2)
     :type executor_cores: int
     :param executor_memory: Memory per executor (e.g. 1000M, 2G) (Default: 1G)
     :type executor_memory: str
@@ -69,6 +71,7 @@ class SparkSubmitHook(BaseHook):
                  py_files=None,
                  jars=None,
                  java_class=None,
+                 total_executor_cores=None,
                  executor_cores=None,
                  executor_memory=None,
                  driver_memory=None,
@@ -84,6 +87,7 @@ class SparkSubmitHook(BaseHook):
         self._py_files = py_files
         self._jars = jars
         self._java_class = java_class
+        self._total_executor_cores = total_executor_cores
         self._executor_cores = executor_cores
         self._executor_memory = executor_memory
         self._driver_memory = driver_memory
@@ -163,6 +167,8 @@ class SparkSubmitHook(BaseHook):
             connection_cmd += ["--jars", self._jars]
         if self._num_executors:
             connection_cmd += ["--num-executors", str(self._num_executors)]
+        if self._total_executor_cores:
+            connection_cmd += ["--total-executor-cores", str(self._total_executor_cores)]
         if self._executor_cores:
             connection_cmd += ["--executor-cores", str(self._executor_cores)]
         if self._executor_memory:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b0245e4a/airflow/contrib/operators/spark_sql_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/spark_sql_operator.py b/airflow/contrib/operators/spark_sql_operator.py
index 2bed535..246e808 100644
--- a/airflow/contrib/operators/spark_sql_operator.py
+++ b/airflow/contrib/operators/spark_sql_operator.py
@@ -26,7 +26,9 @@ class SparkSqlOperator(BaseOperator):
     :type conf: str (format: PROP=VALUE)
     :param conn_id: connection_id string
     :type conn_id: str
-    :param executor_cores: Number of cores per executor
+    :param total_executor_cores: (Standalone & Mesos only) Total cores for all executors (Default: all the available cores on the worker)
+    :type total_executor_cores: int
+    :param executor_cores: (Standalone & YARN only) Number of cores per executor (Default: 2)
     :type executor_cores: int
     :param executor_memory: Memory per executor (e.g. 1000M, 2G) (Default: 1G)
     :type executor_memory: str
@@ -52,6 +54,7 @@ class SparkSqlOperator(BaseOperator):
                  sql,
                  conf=None,
                  conn_id='spark_sql_default',
+                 total_executor_cores=None,
                  executor_cores=None,
                  executor_memory=None,
                  keytab=None,
@@ -65,6 +68,7 @@ class SparkSqlOperator(BaseOperator):
         self._sql = sql
         self._conf = conf
         self._conn_id = conn_id
+        self._total_executor_cores = total_executor_cores
         self._executor_cores = executor_cores
         self._executor_memory = executor_memory
         self._keytab = keytab
@@ -81,6 +85,7 @@ class SparkSqlOperator(BaseOperator):
         self._hook = SparkSqlHook(sql=self._sql,
                                   conf=self._conf,
                                   conn_id=self._conn_id,
+                                  total_executor_cores=self._total_executor_cores,
                                   executor_cores=self._executor_cores,
                                   executor_memory=self._executor_memory,
                                   keytab=self._keytab,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b0245e4a/airflow/contrib/operators/spark_submit_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/spark_submit_operator.py b/airflow/contrib/operators/spark_submit_operator.py
index 2a7e3cf..77aacd3 100644
--- a/airflow/contrib/operators/spark_submit_operator.py
+++ b/airflow/contrib/operators/spark_submit_operator.py
@@ -42,7 +42,9 @@ class SparkSubmitOperator(BaseOperator):
     :type jars: str
     :param java_class: the main class of the Java application
     :type java_class: str
-    :param executor_cores: Number of cores per executor (Default: 2)
+    :param total_executor_cores: (Standalone & Mesos only) Total cores for all executors (Default: all the available cores on the worker)
+    :type total_executor_cores: int
+    :param executor_cores: (Standalone & YARN only) Number of cores per executor (Default: 2)
     :type executor_cores: int
     :param executor_memory: Memory per executor (e.g. 1000M, 2G) (Default: 1G)
     :type executor_memory: str
@@ -71,6 +73,7 @@ class SparkSubmitOperator(BaseOperator):
                  py_files=None,
                  jars=None,
                  java_class=None,
+                 total_executor_cores=None,
                  executor_cores=None,
                  executor_memory=None,
                  driver_memory=None,
@@ -89,6 +92,7 @@ class SparkSubmitOperator(BaseOperator):
         self._py_files = py_files
         self._jars = jars
         self._java_class = java_class
+        self._total_executor_cores = total_executor_cores
         self._executor_cores = executor_cores
         self._executor_memory = executor_memory
         self._driver_memory = driver_memory
@@ -112,6 +116,7 @@ class SparkSubmitOperator(BaseOperator):
             py_files=self._py_files,
             jars=self._jars,
             java_class=self._java_class,
+            total_executor_cores=self._total_executor_cores,
             executor_cores=self._executor_cores,
             executor_memory=self._executor_memory,
             driver_memory=self._driver_memory,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b0245e4a/tests/contrib/hooks/spark_submit_hook.py
----------------------------------------------------------------------
diff --git a/tests/contrib/hooks/spark_submit_hook.py b/tests/contrib/hooks/spark_submit_hook.py
index c17ad28..c156a3f 100644
--- a/tests/contrib/hooks/spark_submit_hook.py
+++ b/tests/contrib/hooks/spark_submit_hook.py
@@ -31,6 +31,7 @@ class TestSparkSubmitHook(unittest.TestCase):
         'files': 'hive-site.xml',
         'py_files': 'sample_library.py',
         'jars': 'parquet.jar',
+        'total_executor_cores': 4,
         'executor_cores': 4,
         'executor_memory': '22g',
         'keytab': 'privileged_user.keytab',
@@ -86,6 +87,7 @@ class TestSparkSubmitHook(unittest.TestCase):
         assert "--files {}".format(self._config['files']) in cmd
         assert "--py-files {}".format(self._config['py_files']) in cmd
         assert "--jars {}".format(self._config['jars']) in cmd
+        assert "--total-executor-cores {}".format(self._config['total_executor_cores']) in cmd
         assert "--executor-cores {}".format(self._config['executor_cores']) in cmd
         assert "--executor-memory {}".format(self._config['executor_memory']) in cmd
         assert "--keytab {}".format(self._config['keytab']) in cmd

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b0245e4a/tests/contrib/operators/spark_submit_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/spark_submit_operator.py b/tests/contrib/operators/spark_submit_operator.py
index 31bfcfa..531235f 100644
--- a/tests/contrib/operators/spark_submit_operator.py
+++ b/tests/contrib/operators/spark_submit_operator.py
@@ -30,6 +30,7 @@ class TestSparkSubmitOperator(unittest.TestCase):
         'files': 'hive-site.xml',
         'py_files': 'sample_library.py',
         'jars': 'parquet.jar',
+        'total_executor_cores':4,
         'executor_cores': 4,
         'executor_memory': '22g',
         'keytab': 'privileged_user.keytab',
@@ -68,6 +69,7 @@ class TestSparkSubmitOperator(unittest.TestCase):
         self.assertEqual(self._config['files'], operator._files)
         self.assertEqual(self._config['py_files'], operator._py_files)
         self.assertEqual(self._config['jars'], operator._jars)
+        self.assertEqual(self._config['total_executor_cores'], operator._total_executor_cores)
         self.assertEqual(self._config['executor_cores'], operator._executor_cores)
         self.assertEqual(self._config['executor_memory'], operator._executor_memory)
         self.assertEqual(self._config['keytab'], operator._keytab)