You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/09/06 11:37:37 UTC

incubator-airflow git commit: [AIRFLOW-1562] Spark-sql logging contains deadlock

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 9df0ac64c -> 32750601a


[AIRFLOW-1562] Spark-sql logging contains deadlock

Logging in SparkSqlOperator does not work as
intended. Spark-sql
internally redirects all logs to stdout (including
stderr),
which causes the current two iterator logging to
get stuck with
the stderr pipe. This situation can lead to a
deadlock
because the std-err can grow too big and it will
start to block
until it will be consumed, which will only happen
when the process
ends, so the process stalls.

Closes #2563 from Fokko/AIRFLOW-1562-Spark-sql-
loggin-contains-deadlock


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/32750601
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/32750601
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/32750601

Branch: refs/heads/master
Commit: 32750601ad0a422283613bf7fccff8eb5407bc9c
Parents: 9df0ac6
Author: Fokko Driesprong <fo...@godatadriven.com>
Authored: Wed Sep 6 13:37:31 2017 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Wed Sep 6 13:37:31 2017 +0200

----------------------------------------------------------------------
 airflow/contrib/hooks/spark_sql_hook.py         | 33 ++++-----
 airflow/contrib/operators/spark_sql_operator.py |  3 +
 .../operators/test_spark_sql_operator.py        | 73 ++++++++++++++++++++
 3 files changed, 93 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/32750601/airflow/contrib/hooks/spark_sql_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/spark_sql_hook.py b/airflow/contrib/hooks/spark_sql_hook.py
index 8d73f60..d7bef7b 100644
--- a/airflow/contrib/hooks/spark_sql_hook.py
+++ b/airflow/contrib/hooks/spark_sql_hook.py
@@ -58,6 +58,7 @@ class SparkSqlHook(BaseHook):
                  executor_cores=None,
                  executor_memory=None,
                  keytab=None,
+                 principal=None,
                  master='yarn',
                  name='default-name',
                  num_executors=None,
@@ -71,6 +72,7 @@ class SparkSqlHook(BaseHook):
         self._executor_cores = executor_cores
         self._executor_memory = executor_memory
         self._keytab = keytab
+        self._principal = principal
         self._master = master
         self._name = name
         self._num_executors = num_executors
@@ -101,6 +103,8 @@ class SparkSqlHook(BaseHook):
             connection_cmd += ["--executor-memory", self._executor_memory]
         if self._keytab:
             connection_cmd += ["--keytab", self._keytab]
+        if self._principal:
+            connection_cmd += ["--principal", self._principal]
         if self._num_executors:
             connection_cmd += ["--num-executors", str(self._num_executors)]
         if self._sql:
@@ -130,25 +134,22 @@ class SparkSqlHook(BaseHook):
         :param cmd: command to remotely execute
         :param kwargs: extra arguments to Popen (see subprocess.Popen)
         """
-        prefixed_cmd = self._prepare_command(cmd)
-        self._sp = subprocess.Popen(prefixed_cmd,
+        spark_sql_cmd = self._prepare_command(cmd)
+        self._sp = subprocess.Popen(spark_sql_cmd,
                                     stdout=subprocess.PIPE,
-                                    stderr=subprocess.PIPE,
+                                    stderr=subprocess.STDOUT,
                                     **kwargs)
-        # using two iterators here to support 'real-time' logging
-        for line in iter(self._sp.stdout.readline, b''):
-            line = line.decode('utf-8').strip()
-            logging.info(line)
-        for line in iter(self._sp.stderr.readline, b''):
-            line = line.decode('utf-8').strip()
-            logging.info(line)
-        output, stderr = self._sp.communicate()
 
-        if self._sp.returncode:
-            raise AirflowException("Cannot execute {} on {}. Error code is: "
-                                   "{}. Output: {}, Stderr: {}"
-                                   .format(cmd, self._conn.host,
-                                           self._sp.returncode, output, stderr))
+        self._process_log(iter(self._sp.stdout.readline, b''))
+
+        returncode = self._sp.wait()
+
+        if returncode:
+            raise AirflowException(
+                "Cannot execute {} on {}. Process exit code: {}.".format(
+                    cmd, self._conn.host, returncode
+                )
+            )
 
     def kill(self):
         if self._sp and self._sp.poll() is None:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/32750601/airflow/contrib/operators/spark_sql_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/spark_sql_operator.py b/airflow/contrib/operators/spark_sql_operator.py
index 246e808..f6cba59 100644
--- a/airflow/contrib/operators/spark_sql_operator.py
+++ b/airflow/contrib/operators/spark_sql_operator.py
@@ -58,6 +58,7 @@ class SparkSqlOperator(BaseOperator):
                  executor_cores=None,
                  executor_memory=None,
                  keytab=None,
+                 principal=None,
                  master='yarn',
                  name='default-name',
                  num_executors=None,
@@ -72,6 +73,7 @@ class SparkSqlOperator(BaseOperator):
         self._executor_cores = executor_cores
         self._executor_memory = executor_memory
         self._keytab = keytab
+        self._principal = principal
         self._master = master
         self._name = name
         self._num_executors = num_executors
@@ -89,6 +91,7 @@ class SparkSqlOperator(BaseOperator):
                                   executor_cores=self._executor_cores,
                                   executor_memory=self._executor_memory,
                                   keytab=self._keytab,
+                                  principal=self._principal,
                                   name=self._name,
                                   num_executors=self._num_executors,
                                   master=self._master,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/32750601/tests/contrib/operators/test_spark_sql_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_spark_sql_operator.py b/tests/contrib/operators/test_spark_sql_operator.py
new file mode 100644
index 0000000..7e71ce6
--- /dev/null
+++ b/tests/contrib/operators/test_spark_sql_operator.py
@@ -0,0 +1,73 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import datetime
+import unittest
+
+from airflow import DAG, configuration
+from airflow.contrib.operators.spark_sql_operator import SparkSqlOperator
+
+DEFAULT_DATE = datetime.datetime(2017, 1, 1)
+
+
+class TestSparkSqlOperator(unittest.TestCase):
+
+    _config = {
+        'sql': 'SELECT 22',
+        'conn_id': 'spark_special_conn_id',
+        'total_executor_cores': 4,
+        'executor_cores': 4,
+        'executor_memory': '22g',
+        'keytab': 'privileged_user.keytab',
+        'principal': 'user/spark@airflow.org',
+        'master': 'yarn-client',
+        'name': 'special-application-name',
+        'num_executors': 8,
+        'yarn_queue': 'special-queue'
+    }
+
+    def setUp(self):
+        configuration.load_test_config()
+        args = {
+            'owner': 'airflow',
+            'start_date': DEFAULT_DATE
+        }
+        self.dag = DAG('test_dag_id', default_args=args)
+
+    def test_execute(self):
+        # Given / When
+        operator = SparkSqlOperator(
+            task_id='spark_sql_job',
+            dag=self.dag,
+            **self._config
+        )
+
+        self.assertEqual(self._config['sql'], operator._sql)
+        self.assertEqual(self._config['conn_id'], operator._conn_id)
+        self.assertEqual(self._config['total_executor_cores'], operator._total_executor_cores)
+        self.assertEqual(self._config['executor_cores'], operator._executor_cores)
+        self.assertEqual(self._config['executor_memory'], operator._executor_memory)
+        self.assertEqual(self._config['keytab'], operator._keytab)
+        self.assertEqual(self._config['principal'], operator._principal)
+        self.assertEqual(self._config['executor_memory'], operator._executor_memory)
+        self.assertEqual(self._config['keytab'], operator._keytab)
+        self.assertEqual(self._config['principal'], operator._principal)
+        self.assertEqual(self._config['master'], operator._master)
+        self.assertEqual(self._config['name'], operator._name)
+        self.assertEqual(self._config['num_executors'], operator._num_executors)
+        self.assertEqual(self._config['yarn_queue'], operator._yarn_queue)
+
+if __name__ == '__main__':
+    unittest.main()