You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/07/17 07:14:55 UTC
incubator-airflow git commit: [AIRFLOW-1393][[AIRFLOW-1393] Enable
Py3 tests in contrib/spark_submit_hook[
Repository: incubator-airflow
Updated Branches:
refs/heads/master 0dd00291d -> 751e936ac
[AIRFLOW-1393][[AIRFLOW-1393] Enable Py3 tests in contrib/spark_submit_hook[
The unit tests in
`tests/contrib/hooks/test_spark_submit_hook.py`
were skiped if run in Python3 because some test
cases loop forever
due to a mismatch/misunderstanding about bytes vs
string that didn't
matter under Py2 (i.e. the mocked data for
subprocess.Popen was
returning a String, but the actual Popen call
would return bytes.)
The fix is to use bytes and `six.ByteIO` so that
the tests work on Py2
and Py3. Alsowe had to patch `subprocess.Popen` in
the right place so
the mocks are picked up.
Closes #2427 from ashb/enable-
spark_submit_hook_tests-py3
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/751e936a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/751e936a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/751e936a
Branch: refs/heads/master
Commit: 751e936ac2d18aec0315d2ed4f307c6b04ea431e
Parents: 0dd0029
Author: Ash Berlin-Taylor <as...@firemirror.com>
Authored: Mon Jul 17 09:14:49 2017 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Mon Jul 17 09:14:49 2017 +0200
----------------------------------------------------------------------
airflow/contrib/hooks/spark_submit_hook.py | 6 ++++--
tests/contrib/hooks/test_spark_submit_hook.py | 20 ++++++++------------
2 files changed, 12 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/751e936a/airflow/contrib/hooks/spark_submit_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/spark_submit_hook.py b/airflow/contrib/hooks/spark_submit_hook.py
index 14e297b..a667753 100644
--- a/airflow/contrib/hooks/spark_submit_hook.py
+++ b/airflow/contrib/hooks/spark_submit_hook.py
@@ -212,9 +212,11 @@ class SparkSubmitHook(BaseHook):
self._sp = subprocess.Popen(spark_submit_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
+ bufsize=-1,
+ universal_newlines=True,
**kwargs)
- self._process_log(iter(self._sp.stdout.readline, b''))
+ self._process_log(iter(self._sp.stdout.readline, ''))
returncode = self._sp.wait()
if returncode:
@@ -232,7 +234,7 @@ class SparkSubmitHook(BaseHook):
"""
# Consume the iterator
for line in itr:
- line = line.decode('utf-8').strip()
+ line = line.strip()
# If we run yarn cluster mode, we want to extract the application id from
# the logs so we can kill the application when we stop it unexpectedly
if self._is_yarn and self._connection['deploy_mode'] == 'cluster':
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/751e936a/tests/contrib/hooks/test_spark_submit_hook.py
----------------------------------------------------------------------
diff --git a/tests/contrib/hooks/test_spark_submit_hook.py b/tests/contrib/hooks/test_spark_submit_hook.py
index 6b7da75..826576f 100644
--- a/tests/contrib/hooks/test_spark_submit_hook.py
+++ b/tests/contrib/hooks/test_spark_submit_hook.py
@@ -12,9 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
+import six
import sys
import unittest
-from io import StringIO
from airflow import configuration, models
from airflow.utils import db
@@ -63,10 +63,6 @@ class TestSparkSubmitHook(unittest.TestCase):
def setUp(self):
- if sys.version_info[0] == 3:
- raise unittest.SkipTest('TestSparkSubmitHook won\'t work with '
- 'python3. No need to test anything here')
-
configuration.load_test_config()
db.merge_conn(
models.Connection(
@@ -135,11 +131,11 @@ class TestSparkSubmitHook(unittest.TestCase):
]
self.assertEquals(expected_build_cmd, cmd)
- @patch('subprocess.Popen')
+ @patch('airflow.contrib.hooks.spark_submit_hook.subprocess.Popen')
def test_spark_process_runcmd(self, mock_popen):
# Given
- mock_popen.return_value.stdout = StringIO(u'stdout')
- mock_popen.return_value.stderr = StringIO(u'stderr')
+ mock_popen.return_value.stdout = six.StringIO('stdout')
+ mock_popen.return_value.stderr = six.StringIO('stderr')
mock_popen.return_value.wait.return_value = 0
# When
@@ -147,7 +143,7 @@ class TestSparkSubmitHook(unittest.TestCase):
hook.submit()
# Then
- self.assertEqual(mock_popen.mock_calls[0], call(['spark-submit', '--master', 'yarn', '--name', 'default-name', ''], stdout=-1, stderr=-2))
+ self.assertEqual(mock_popen.mock_calls[0], call(['spark-submit', '--master', 'yarn', '--name', 'default-name', ''], stderr=-2, stdout=-1, universal_newlines=True, bufsize=-1))
def test_resolve_connection_yarn_default(self):
# Given
@@ -309,11 +305,11 @@ class TestSparkSubmitHook(unittest.TestCase):
self.assertEqual(hook._yarn_application_id, 'application_1486558679801_1820')
- @patch('subprocess.Popen')
+ @patch('airflow.contrib.hooks.spark_submit_hook.subprocess.Popen')
def test_spark_process_on_kill(self, mock_popen):
# Given
- mock_popen.return_value.stdout = StringIO(u'stdout')
- mock_popen.return_value.stderr = StringIO(u'stderr')
+ mock_popen.return_value.stdout = six.StringIO('stdout')
+ mock_popen.return_value.stderr = six.StringIO('stderr')
mock_popen.return_value.poll.return_value = None
mock_popen.return_value.wait.return_value = 0
log_lines = [