You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ma...@apache.org on 2017/06/08 15:36:45 UTC
[02/18] incubator-airflow git commit: [AIRFLOW-1160] Update Spark
parameters for Mesos
[AIRFLOW-1160] Update Spark parameters for Mesos
Closes #2265 from cameres/master
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b0245e4a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b0245e4a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b0245e4a
Branch: refs/heads/v1-8-test
Commit: b0245e4a3dacf665e26446acafb3e80f558a83e0
Parents: 9744b1a
Author: Connor Ameres <co...@gmail.com>
Authored: Mon May 1 23:22:04 2017 +0200
Committer: Maxime Beauchemin <ma...@gmail.com>
Committed: Thu Jun 8 08:36:20 2017 -0700
----------------------------------------------------------------------
airflow/contrib/hooks/spark_sql_hook.py | 8 +++++++-
airflow/contrib/hooks/spark_submit_hook.py | 8 +++++++-
airflow/contrib/operators/spark_sql_operator.py | 7 ++++++-
airflow/contrib/operators/spark_submit_operator.py | 7 ++++++-
tests/contrib/hooks/spark_submit_hook.py | 2 ++
tests/contrib/operators/spark_submit_operator.py | 2 ++
6 files changed, 30 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b0245e4a/airflow/contrib/hooks/spark_sql_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/spark_sql_hook.py b/airflow/contrib/hooks/spark_sql_hook.py
index 37df3fe..8d73f60 100644
--- a/airflow/contrib/hooks/spark_sql_hook.py
+++ b/airflow/contrib/hooks/spark_sql_hook.py
@@ -31,7 +31,9 @@ class SparkSqlHook(BaseHook):
:type conf: str (format: PROP=VALUE)
:param conn_id: connection_id string
:type conn_id: str
- :param executor_cores: Number of cores per executor
+ :param total_executor_cores: (Standalone & Mesos only) Total cores for all executors (Default: all the available cores on the worker)
+ :type total_executor_cores: int
+ :param executor_cores: (Standalone & YARN only) Number of cores per executor (Default: 2)
:type executor_cores: int
:param executor_memory: Memory per executor (e.g. 1000M, 2G) (Default: 1G)
:type executor_memory: str
@@ -52,6 +54,7 @@ class SparkSqlHook(BaseHook):
sql,
conf=None,
conn_id='spark_sql_default',
+ total_executor_cores=None,
executor_cores=None,
executor_memory=None,
keytab=None,
@@ -64,6 +67,7 @@ class SparkSqlHook(BaseHook):
self._sql = sql
self._conf = conf
self._conn = self.get_connection(conn_id)
+ self._total_executor_cores = total_executor_cores
self._executor_cores = executor_cores
self._executor_memory = executor_memory
self._keytab = keytab
@@ -89,6 +93,8 @@ class SparkSqlHook(BaseHook):
if self._conf:
for conf_el in self._conf.split(","):
connection_cmd += ["--conf", conf_el]
+ if self._total_executor_cores:
+ connection_cmd += ["--total-executor-cores", str(self._total_executor_cores)]
if self._executor_cores:
connection_cmd += ["--executor-cores", str(self._executor_cores)]
if self._executor_memory:
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b0245e4a/airflow/contrib/hooks/spark_submit_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/spark_submit_hook.py b/airflow/contrib/hooks/spark_submit_hook.py
index e4ce797..c34538e 100644
--- a/airflow/contrib/hooks/spark_submit_hook.py
+++ b/airflow/contrib/hooks/spark_submit_hook.py
@@ -42,7 +42,9 @@ class SparkSubmitHook(BaseHook):
:type jars: str
:param java_class: the main class of the Java application
:type java_class: str
- :param executor_cores: Number of cores per executor (Default: 2)
+ :param total_executor_cores: (Standalone & Mesos only) Total cores for all executors (Default: all the available cores on the worker)
+ :type total_executor_cores: int
+ :param executor_cores: (Standalone & YARN only) Number of cores per executor (Default: 2)
:type executor_cores: int
:param executor_memory: Memory per executor (e.g. 1000M, 2G) (Default: 1G)
:type executor_memory: str
@@ -69,6 +71,7 @@ class SparkSubmitHook(BaseHook):
py_files=None,
jars=None,
java_class=None,
+ total_executor_cores=None,
executor_cores=None,
executor_memory=None,
driver_memory=None,
@@ -84,6 +87,7 @@ class SparkSubmitHook(BaseHook):
self._py_files = py_files
self._jars = jars
self._java_class = java_class
+ self._total_executor_cores = total_executor_cores
self._executor_cores = executor_cores
self._executor_memory = executor_memory
self._driver_memory = driver_memory
@@ -163,6 +167,8 @@ class SparkSubmitHook(BaseHook):
connection_cmd += ["--jars", self._jars]
if self._num_executors:
connection_cmd += ["--num-executors", str(self._num_executors)]
+ if self._total_executor_cores:
+ connection_cmd += ["--total-executor-cores", str(self._total_executor_cores)]
if self._executor_cores:
connection_cmd += ["--executor-cores", str(self._executor_cores)]
if self._executor_memory:
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b0245e4a/airflow/contrib/operators/spark_sql_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/spark_sql_operator.py b/airflow/contrib/operators/spark_sql_operator.py
index 2bed535..246e808 100644
--- a/airflow/contrib/operators/spark_sql_operator.py
+++ b/airflow/contrib/operators/spark_sql_operator.py
@@ -26,7 +26,9 @@ class SparkSqlOperator(BaseOperator):
:type conf: str (format: PROP=VALUE)
:param conn_id: connection_id string
:type conn_id: str
- :param executor_cores: Number of cores per executor
+ :param total_executor_cores: (Standalone & Mesos only) Total cores for all executors (Default: all the available cores on the worker)
+ :type total_executor_cores: int
+ :param executor_cores: (Standalone & YARN only) Number of cores per executor (Default: 2)
:type executor_cores: int
:param executor_memory: Memory per executor (e.g. 1000M, 2G) (Default: 1G)
:type executor_memory: str
@@ -52,6 +54,7 @@ class SparkSqlOperator(BaseOperator):
sql,
conf=None,
conn_id='spark_sql_default',
+ total_executor_cores=None,
executor_cores=None,
executor_memory=None,
keytab=None,
@@ -65,6 +68,7 @@ class SparkSqlOperator(BaseOperator):
self._sql = sql
self._conf = conf
self._conn_id = conn_id
+ self._total_executor_cores = total_executor_cores
self._executor_cores = executor_cores
self._executor_memory = executor_memory
self._keytab = keytab
@@ -81,6 +85,7 @@ class SparkSqlOperator(BaseOperator):
self._hook = SparkSqlHook(sql=self._sql,
conf=self._conf,
conn_id=self._conn_id,
+ total_executor_cores=self._total_executor_cores,
executor_cores=self._executor_cores,
executor_memory=self._executor_memory,
keytab=self._keytab,
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b0245e4a/airflow/contrib/operators/spark_submit_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/spark_submit_operator.py b/airflow/contrib/operators/spark_submit_operator.py
index 2a7e3cf..77aacd3 100644
--- a/airflow/contrib/operators/spark_submit_operator.py
+++ b/airflow/contrib/operators/spark_submit_operator.py
@@ -42,7 +42,9 @@ class SparkSubmitOperator(BaseOperator):
:type jars: str
:param java_class: the main class of the Java application
:type java_class: str
- :param executor_cores: Number of cores per executor (Default: 2)
+ :param total_executor_cores: (Standalone & Mesos only) Total cores for all executors (Default: all the available cores on the worker)
+ :type total_executor_cores: int
+ :param executor_cores: (Standalone & YARN only) Number of cores per executor (Default: 2)
:type executor_cores: int
:param executor_memory: Memory per executor (e.g. 1000M, 2G) (Default: 1G)
:type executor_memory: str
@@ -71,6 +73,7 @@ class SparkSubmitOperator(BaseOperator):
py_files=None,
jars=None,
java_class=None,
+ total_executor_cores=None,
executor_cores=None,
executor_memory=None,
driver_memory=None,
@@ -89,6 +92,7 @@ class SparkSubmitOperator(BaseOperator):
self._py_files = py_files
self._jars = jars
self._java_class = java_class
+ self._total_executor_cores = total_executor_cores
self._executor_cores = executor_cores
self._executor_memory = executor_memory
self._driver_memory = driver_memory
@@ -112,6 +116,7 @@ class SparkSubmitOperator(BaseOperator):
py_files=self._py_files,
jars=self._jars,
java_class=self._java_class,
+ total_executor_cores=self._total_executor_cores,
executor_cores=self._executor_cores,
executor_memory=self._executor_memory,
driver_memory=self._driver_memory,
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b0245e4a/tests/contrib/hooks/spark_submit_hook.py
----------------------------------------------------------------------
diff --git a/tests/contrib/hooks/spark_submit_hook.py b/tests/contrib/hooks/spark_submit_hook.py
index c17ad28..c156a3f 100644
--- a/tests/contrib/hooks/spark_submit_hook.py
+++ b/tests/contrib/hooks/spark_submit_hook.py
@@ -31,6 +31,7 @@ class TestSparkSubmitHook(unittest.TestCase):
'files': 'hive-site.xml',
'py_files': 'sample_library.py',
'jars': 'parquet.jar',
+ 'total_executor_cores': 4,
'executor_cores': 4,
'executor_memory': '22g',
'keytab': 'privileged_user.keytab',
@@ -86,6 +87,7 @@ class TestSparkSubmitHook(unittest.TestCase):
assert "--files {}".format(self._config['files']) in cmd
assert "--py-files {}".format(self._config['py_files']) in cmd
assert "--jars {}".format(self._config['jars']) in cmd
+ assert "--total-executor-cores {}".format(self._config['total_executor_cores']) in cmd
assert "--executor-cores {}".format(self._config['executor_cores']) in cmd
assert "--executor-memory {}".format(self._config['executor_memory']) in cmd
assert "--keytab {}".format(self._config['keytab']) in cmd
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b0245e4a/tests/contrib/operators/spark_submit_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/spark_submit_operator.py b/tests/contrib/operators/spark_submit_operator.py
index 31bfcfa..531235f 100644
--- a/tests/contrib/operators/spark_submit_operator.py
+++ b/tests/contrib/operators/spark_submit_operator.py
@@ -30,6 +30,7 @@ class TestSparkSubmitOperator(unittest.TestCase):
'files': 'hive-site.xml',
'py_files': 'sample_library.py',
'jars': 'parquet.jar',
+ 'total_executor_cores':4,
'executor_cores': 4,
'executor_memory': '22g',
'keytab': 'privileged_user.keytab',
@@ -68,6 +69,7 @@ class TestSparkSubmitOperator(unittest.TestCase):
self.assertEqual(self._config['files'], operator._files)
self.assertEqual(self._config['py_files'], operator._py_files)
self.assertEqual(self._config['jars'], operator._jars)
+ self.assertEqual(self._config['total_executor_cores'], operator._total_executor_cores)
self.assertEqual(self._config['executor_cores'], operator._executor_cores)
self.assertEqual(self._config['executor_memory'], operator._executor_memory)
self.assertEqual(self._config['keytab'], operator._keytab)