You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/07/14 10:21:19 UTC
incubator-airflow git commit: [AIRFLOW-1255] Fix SparkSubmitHook
output deadlock
Repository: incubator-airflow
Updated Branches:
refs/heads/master 194d1d6e5 -> 28aeed4aa
[AIRFLOW-1255] Fix SparkSubmitHook output deadlock
Refactor the SparkSubmitHook output processing to
avoid a deadlock.
The prior implementation deadlocks if the stderr
pipe buffer fills:
1. Airflow tries to drain stdout before starting
on stderr;
2. Spark gets suspended if the stderr pipe fills;
3. Both processes are now waiting for something
that will never happen.
Closes #2438 from asnare/fix/spark-submit
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/28aeed4a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/28aeed4a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/28aeed4a
Branch: refs/heads/master
Commit: 28aeed4aa62acb295eda1cf94a40f7f643b650fb
Parents: 194d1d6
Author: Andrew Snare <an...@godatadriven.com>
Authored: Fri Jul 14 12:21:12 2017 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Fri Jul 14 12:21:12 2017 +0200
----------------------------------------------------------------------
airflow/contrib/hooks/spark_submit_hook.py | 19 +++++++------------
tests/contrib/hooks/test_spark_submit_hook.py | 8 +++-----
2 files changed, 10 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/28aeed4a/airflow/contrib/hooks/spark_submit_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/spark_submit_hook.py b/airflow/contrib/hooks/spark_submit_hook.py
index 88d547b..14e297b 100644
--- a/airflow/contrib/hooks/spark_submit_hook.py
+++ b/airflow/contrib/hooks/spark_submit_hook.py
@@ -26,7 +26,7 @@ log = logging.getLogger(__name__)
class SparkSubmitHook(BaseHook):
"""
This hook is a wrapper around the spark-submit binary to kick off a spark-submit job.
- It requires that the "spark-submit" binary is in the PATH or the spark_home to be
+ It requires that the "spark-submit" binary is in the PATH or the spark_home to be
supplied.
:param conf: Arbitrary Spark configuration properties
:type conf: dict
@@ -211,21 +211,16 @@ class SparkSubmitHook(BaseHook):
spark_submit_cmd = self._build_command(application)
self._sp = subprocess.Popen(spark_submit_cmd,
stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
**kwargs)
- # Using two iterators here to support 'real-time' logging
- sources = [self._sp.stdout, self._sp.stderr]
+ self._process_log(iter(self._sp.stdout.readline, b''))
+ returncode = self._sp.wait()
- for source in sources:
- self._process_log(iter(source.readline, b''))
-
- output, stderr = self._sp.communicate()
-
- if self._sp.returncode:
+ if returncode:
raise AirflowException(
- "Cannot execute: {}. Error code is: {}. Output: {}, Stderr: {}".format(
- spark_submit_cmd, self._sp.returncode, output, stderr
+ "Cannot execute: {}. Error code is: {}.".format(
+ spark_submit_cmd, returncode
)
)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/28aeed4a/tests/contrib/hooks/test_spark_submit_hook.py
----------------------------------------------------------------------
diff --git a/tests/contrib/hooks/test_spark_submit_hook.py b/tests/contrib/hooks/test_spark_submit_hook.py
index 98e959b..6b7da75 100644
--- a/tests/contrib/hooks/test_spark_submit_hook.py
+++ b/tests/contrib/hooks/test_spark_submit_hook.py
@@ -140,15 +140,14 @@ class TestSparkSubmitHook(unittest.TestCase):
# Given
mock_popen.return_value.stdout = StringIO(u'stdout')
mock_popen.return_value.stderr = StringIO(u'stderr')
- mock_popen.return_value.returncode = 0
- mock_popen.return_value.communicate.return_value = [StringIO(u'stdout\nstdout'), StringIO(u'stderr\nstderr')]
+ mock_popen.return_value.wait.return_value = 0
# When
hook = SparkSubmitHook(conn_id='')
hook.submit()
# Then
- self.assertEqual(mock_popen.mock_calls[0], call(['spark-submit', '--master', 'yarn', '--name', 'default-name', ''], stderr=-1, stdout=-1))
+ self.assertEqual(mock_popen.mock_calls[0], call(['spark-submit', '--master', 'yarn', '--name', 'default-name', ''], stdout=-1, stderr=-2))
def test_resolve_connection_yarn_default(self):
# Given
@@ -315,9 +314,8 @@ class TestSparkSubmitHook(unittest.TestCase):
# Given
mock_popen.return_value.stdout = StringIO(u'stdout')
mock_popen.return_value.stderr = StringIO(u'stderr')
- mock_popen.return_value.returncode = 0
mock_popen.return_value.poll.return_value = None
- mock_popen.return_value.communicate.return_value = [StringIO(u'stderr\nstderr'), StringIO(u'stderr\nstderr')]
+ mock_popen.return_value.wait.return_value = 0
log_lines = [
'SPARK_MAJOR_VERSION is set to 2, using Spark2',
'WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable',