You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/07/14 10:21:19 UTC

incubator-airflow git commit: [AIRFLOW-1255] Fix SparkSubmitHook output deadlock

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 194d1d6e5 -> 28aeed4aa


[AIRFLOW-1255] Fix SparkSubmitHook output deadlock

Refactor the SparkSubmitHook output processing to
avoid a deadlock.
The prior implementation deadlocks if the stderr
pipe buffer fills:

1. Airflow tries to drain stdout before starting
on stderr;
2. Spark gets suspended if the stderr pipe fills;
3. Both processes are now waiting for something
that will never happen.

Closes #2438 from asnare/fix/spark-submit


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/28aeed4a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/28aeed4a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/28aeed4a

Branch: refs/heads/master
Commit: 28aeed4aa62acb295eda1cf94a40f7f643b650fb
Parents: 194d1d6
Author: Andrew Snare <an...@godatadriven.com>
Authored: Fri Jul 14 12:21:12 2017 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Fri Jul 14 12:21:12 2017 +0200

----------------------------------------------------------------------
 airflow/contrib/hooks/spark_submit_hook.py    | 19 +++++++------------
 tests/contrib/hooks/test_spark_submit_hook.py |  8 +++-----
 2 files changed, 10 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/28aeed4a/airflow/contrib/hooks/spark_submit_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/spark_submit_hook.py b/airflow/contrib/hooks/spark_submit_hook.py
index 88d547b..14e297b 100644
--- a/airflow/contrib/hooks/spark_submit_hook.py
+++ b/airflow/contrib/hooks/spark_submit_hook.py
@@ -26,7 +26,7 @@ log = logging.getLogger(__name__)
 class SparkSubmitHook(BaseHook):
     """
     This hook is a wrapper around the spark-submit binary to kick off a spark-submit job.
-    It requires that the "spark-submit" binary is in the PATH or the spark_home to be 
+    It requires that the "spark-submit" binary is in the PATH or the spark_home to be
     supplied.
     :param conf: Arbitrary Spark configuration properties
     :type conf: dict
@@ -211,21 +211,16 @@ class SparkSubmitHook(BaseHook):
         spark_submit_cmd = self._build_command(application)
         self._sp = subprocess.Popen(spark_submit_cmd,
                                     stdout=subprocess.PIPE,
-                                    stderr=subprocess.PIPE,
+                                    stderr=subprocess.STDOUT,
                                     **kwargs)
 
-        # Using two iterators here to support 'real-time' logging
-        sources = [self._sp.stdout, self._sp.stderr]
+        self._process_log(iter(self._sp.stdout.readline, b''))
+        returncode = self._sp.wait()
 
-        for source in sources:
-            self._process_log(iter(source.readline, b''))
-
-        output, stderr = self._sp.communicate()
-
-        if self._sp.returncode:
+        if returncode:
             raise AirflowException(
-                "Cannot execute: {}. Error code is: {}. Output: {}, Stderr: {}".format(
-                    spark_submit_cmd, self._sp.returncode, output, stderr
+                "Cannot execute: {}. Error code is: {}.".format(
+                    spark_submit_cmd, returncode
                 )
             )
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/28aeed4a/tests/contrib/hooks/test_spark_submit_hook.py
----------------------------------------------------------------------
diff --git a/tests/contrib/hooks/test_spark_submit_hook.py b/tests/contrib/hooks/test_spark_submit_hook.py
index 98e959b..6b7da75 100644
--- a/tests/contrib/hooks/test_spark_submit_hook.py
+++ b/tests/contrib/hooks/test_spark_submit_hook.py
@@ -140,15 +140,14 @@ class TestSparkSubmitHook(unittest.TestCase):
         # Given
         mock_popen.return_value.stdout = StringIO(u'stdout')
         mock_popen.return_value.stderr = StringIO(u'stderr')
-        mock_popen.return_value.returncode = 0
-        mock_popen.return_value.communicate.return_value = [StringIO(u'stdout\nstdout'), StringIO(u'stderr\nstderr')]
+        mock_popen.return_value.wait.return_value = 0
 
         # When
         hook = SparkSubmitHook(conn_id='')
         hook.submit()
 
         # Then
-        self.assertEqual(mock_popen.mock_calls[0], call(['spark-submit', '--master', 'yarn', '--name', 'default-name', ''], stderr=-1, stdout=-1))
+        self.assertEqual(mock_popen.mock_calls[0], call(['spark-submit', '--master', 'yarn', '--name', 'default-name', ''], stdout=-1, stderr=-2))
 
     def test_resolve_connection_yarn_default(self):
         # Given
@@ -315,9 +314,8 @@ class TestSparkSubmitHook(unittest.TestCase):
         # Given
         mock_popen.return_value.stdout = StringIO(u'stdout')
         mock_popen.return_value.stderr = StringIO(u'stderr')
-        mock_popen.return_value.returncode = 0
         mock_popen.return_value.poll.return_value = None
-        mock_popen.return_value.communicate.return_value = [StringIO(u'stderr\nstderr'), StringIO(u'stderr\nstderr')]
+        mock_popen.return_value.wait.return_value = 0
         log_lines = [
             'SPARK_MAJOR_VERSION is set to 2, using Spark2',
             'WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable',