You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/09/06 11:37:37 UTC
incubator-airflow git commit: [AIRFLOW-1562] Spark-sql logging
contains deadlock
Repository: incubator-airflow
Updated Branches:
refs/heads/master 9df0ac64c -> 32750601a
[AIRFLOW-1562] Spark-sql logging contains deadlock
Logging in SparkSqlOperator does not work as
intended. Spark-sql
internally redirects all logs to stdout (including
stderr),
which causes the current two iterator logging to
get stuck with
the stderr pipe. This situation can lead to a
deadlock
because the std-err can grow too big and it will
start to block
until it will be consumed, which will only happen
when the process
ends, so the process stalls.
Closes #2563 from Fokko/AIRFLOW-1562-Spark-sql-
loggin-contains-deadlock
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/32750601
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/32750601
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/32750601
Branch: refs/heads/master
Commit: 32750601ad0a422283613bf7fccff8eb5407bc9c
Parents: 9df0ac6
Author: Fokko Driesprong <fo...@godatadriven.com>
Authored: Wed Sep 6 13:37:31 2017 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Wed Sep 6 13:37:31 2017 +0200
----------------------------------------------------------------------
airflow/contrib/hooks/spark_sql_hook.py | 33 ++++-----
airflow/contrib/operators/spark_sql_operator.py | 3 +
.../operators/test_spark_sql_operator.py | 73 ++++++++++++++++++++
3 files changed, 93 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/32750601/airflow/contrib/hooks/spark_sql_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/spark_sql_hook.py b/airflow/contrib/hooks/spark_sql_hook.py
index 8d73f60..d7bef7b 100644
--- a/airflow/contrib/hooks/spark_sql_hook.py
+++ b/airflow/contrib/hooks/spark_sql_hook.py
@@ -58,6 +58,7 @@ class SparkSqlHook(BaseHook):
executor_cores=None,
executor_memory=None,
keytab=None,
+ principal=None,
master='yarn',
name='default-name',
num_executors=None,
@@ -71,6 +72,7 @@ class SparkSqlHook(BaseHook):
self._executor_cores = executor_cores
self._executor_memory = executor_memory
self._keytab = keytab
+ self._principal = principal
self._master = master
self._name = name
self._num_executors = num_executors
@@ -101,6 +103,8 @@ class SparkSqlHook(BaseHook):
connection_cmd += ["--executor-memory", self._executor_memory]
if self._keytab:
connection_cmd += ["--keytab", self._keytab]
+ if self._principal:
+ connection_cmd += ["--principal", self._principal]
if self._num_executors:
connection_cmd += ["--num-executors", str(self._num_executors)]
if self._sql:
@@ -130,25 +134,22 @@ class SparkSqlHook(BaseHook):
:param cmd: command to remotely execute
:param kwargs: extra arguments to Popen (see subprocess.Popen)
"""
- prefixed_cmd = self._prepare_command(cmd)
- self._sp = subprocess.Popen(prefixed_cmd,
+ spark_sql_cmd = self._prepare_command(cmd)
+ self._sp = subprocess.Popen(spark_sql_cmd,
stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
**kwargs)
- # using two iterators here to support 'real-time' logging
- for line in iter(self._sp.stdout.readline, b''):
- line = line.decode('utf-8').strip()
- logging.info(line)
- for line in iter(self._sp.stderr.readline, b''):
- line = line.decode('utf-8').strip()
- logging.info(line)
- output, stderr = self._sp.communicate()
- if self._sp.returncode:
- raise AirflowException("Cannot execute {} on {}. Error code is: "
- "{}. Output: {}, Stderr: {}"
- .format(cmd, self._conn.host,
- self._sp.returncode, output, stderr))
+ self._process_log(iter(self._sp.stdout.readline, b''))
+
+ returncode = self._sp.wait()
+
+ if returncode:
+ raise AirflowException(
+ "Cannot execute {} on {}. Process exit code: {}.".format(
+ cmd, self._conn.host, returncode
+ )
+ )
def kill(self):
if self._sp and self._sp.poll() is None:
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/32750601/airflow/contrib/operators/spark_sql_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/spark_sql_operator.py b/airflow/contrib/operators/spark_sql_operator.py
index 246e808..f6cba59 100644
--- a/airflow/contrib/operators/spark_sql_operator.py
+++ b/airflow/contrib/operators/spark_sql_operator.py
@@ -58,6 +58,7 @@ class SparkSqlOperator(BaseOperator):
executor_cores=None,
executor_memory=None,
keytab=None,
+ principal=None,
master='yarn',
name='default-name',
num_executors=None,
@@ -72,6 +73,7 @@ class SparkSqlOperator(BaseOperator):
self._executor_cores = executor_cores
self._executor_memory = executor_memory
self._keytab = keytab
+ self._principal = principal
self._master = master
self._name = name
self._num_executors = num_executors
@@ -89,6 +91,7 @@ class SparkSqlOperator(BaseOperator):
executor_cores=self._executor_cores,
executor_memory=self._executor_memory,
keytab=self._keytab,
+ principal=self._principal,
name=self._name,
num_executors=self._num_executors,
master=self._master,
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/32750601/tests/contrib/operators/test_spark_sql_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_spark_sql_operator.py b/tests/contrib/operators/test_spark_sql_operator.py
new file mode 100644
index 0000000..7e71ce6
--- /dev/null
+++ b/tests/contrib/operators/test_spark_sql_operator.py
@@ -0,0 +1,73 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import datetime
+import unittest
+
+from airflow import DAG, configuration
+from airflow.contrib.operators.spark_sql_operator import SparkSqlOperator
+
+DEFAULT_DATE = datetime.datetime(2017, 1, 1)
+
+
+class TestSparkSqlOperator(unittest.TestCase):
+
+ _config = {
+ 'sql': 'SELECT 22',
+ 'conn_id': 'spark_special_conn_id',
+ 'total_executor_cores': 4,
+ 'executor_cores': 4,
+ 'executor_memory': '22g',
+ 'keytab': 'privileged_user.keytab',
+ 'principal': 'user/spark@airflow.org',
+ 'master': 'yarn-client',
+ 'name': 'special-application-name',
+ 'num_executors': 8,
+ 'yarn_queue': 'special-queue'
+ }
+
+ def setUp(self):
+ configuration.load_test_config()
+ args = {
+ 'owner': 'airflow',
+ 'start_date': DEFAULT_DATE
+ }
+ self.dag = DAG('test_dag_id', default_args=args)
+
+ def test_execute(self):
+ # Given / When
+ operator = SparkSqlOperator(
+ task_id='spark_sql_job',
+ dag=self.dag,
+ **self._config
+ )
+
+ self.assertEqual(self._config['sql'], operator._sql)
+ self.assertEqual(self._config['conn_id'], operator._conn_id)
+ self.assertEqual(self._config['total_executor_cores'], operator._total_executor_cores)
+ self.assertEqual(self._config['executor_cores'], operator._executor_cores)
+ self.assertEqual(self._config['executor_memory'], operator._executor_memory)
+ self.assertEqual(self._config['keytab'], operator._keytab)
+ self.assertEqual(self._config['principal'], operator._principal)
+ self.assertEqual(self._config['executor_memory'], operator._executor_memory)
+ self.assertEqual(self._config['keytab'], operator._keytab)
+ self.assertEqual(self._config['principal'], operator._principal)
+ self.assertEqual(self._config['master'], operator._master)
+ self.assertEqual(self._config['name'], operator._name)
+ self.assertEqual(self._config['num_executors'], operator._num_executors)
+ self.assertEqual(self._config['yarn_queue'], operator._yarn_queue)
+
+if __name__ == '__main__':
+ unittest.main()