You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/07/17 07:14:55 UTC

incubator-airflow git commit: [AIRFLOW-1393][[AIRFLOW-1393] Enable Py3 tests in contrib/spark_submit_hook[

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 0dd00291d -> 751e936ac


[AIRFLOW-1393][[AIRFLOW-1393] Enable Py3 tests in contrib/spark_submit_hook[

The unit tests in
`tests/contrib/hooks/test_spark_submit_hook.py`
were skiped if run in Python3 because some test
cases loop forever
due to a mismatch/misunderstanding about bytes vs
string that didn't
matter under Py2 (i.e. the mocked data for
subprocess.Popen was
returning a String, but the actual Popen call
would return bytes.)

The fix is to use bytes and `six.ByteIO` so that
the tests work on Py2
and Py3. Alsowe had to patch `subprocess.Popen` in
the right place so
the mocks are picked up.

Closes #2427 from ashb/enable-
spark_submit_hook_tests-py3


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/751e936a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/751e936a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/751e936a

Branch: refs/heads/master
Commit: 751e936ac2d18aec0315d2ed4f307c6b04ea431e
Parents: 0dd0029
Author: Ash Berlin-Taylor <as...@firemirror.com>
Authored: Mon Jul 17 09:14:49 2017 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Mon Jul 17 09:14:49 2017 +0200

----------------------------------------------------------------------
 airflow/contrib/hooks/spark_submit_hook.py    |  6 ++++--
 tests/contrib/hooks/test_spark_submit_hook.py | 20 ++++++++------------
 2 files changed, 12 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/751e936a/airflow/contrib/hooks/spark_submit_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/spark_submit_hook.py b/airflow/contrib/hooks/spark_submit_hook.py
index 14e297b..a667753 100644
--- a/airflow/contrib/hooks/spark_submit_hook.py
+++ b/airflow/contrib/hooks/spark_submit_hook.py
@@ -212,9 +212,11 @@ class SparkSubmitHook(BaseHook):
         self._sp = subprocess.Popen(spark_submit_cmd,
                                     stdout=subprocess.PIPE,
                                     stderr=subprocess.STDOUT,
+                                    bufsize=-1,
+                                    universal_newlines=True,
                                     **kwargs)
 
-        self._process_log(iter(self._sp.stdout.readline, b''))
+        self._process_log(iter(self._sp.stdout.readline, ''))
         returncode = self._sp.wait()
 
         if returncode:
@@ -232,7 +234,7 @@ class SparkSubmitHook(BaseHook):
         """
         # Consume the iterator
         for line in itr:
-            line = line.decode('utf-8').strip()
+            line = line.strip()
             # If we run yarn cluster mode, we want to extract the application id from
             # the logs so we can kill the application when we stop it unexpectedly
             if self._is_yarn and self._connection['deploy_mode'] == 'cluster':

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/751e936a/tests/contrib/hooks/test_spark_submit_hook.py
----------------------------------------------------------------------
diff --git a/tests/contrib/hooks/test_spark_submit_hook.py b/tests/contrib/hooks/test_spark_submit_hook.py
index 6b7da75..826576f 100644
--- a/tests/contrib/hooks/test_spark_submit_hook.py
+++ b/tests/contrib/hooks/test_spark_submit_hook.py
@@ -12,9 +12,9 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+import six
 import sys
 import unittest
-from io import StringIO
 
 from airflow import configuration, models
 from airflow.utils import db
@@ -63,10 +63,6 @@ class TestSparkSubmitHook(unittest.TestCase):
 
     def setUp(self):
 
-        if sys.version_info[0] == 3:
-            raise unittest.SkipTest('TestSparkSubmitHook won\'t work with '
-                                    'python3. No need to test anything here')
-
         configuration.load_test_config()
         db.merge_conn(
             models.Connection(
@@ -135,11 +131,11 @@ class TestSparkSubmitHook(unittest.TestCase):
         ]
         self.assertEquals(expected_build_cmd, cmd)
 
-    @patch('subprocess.Popen')
+    @patch('airflow.contrib.hooks.spark_submit_hook.subprocess.Popen')
     def test_spark_process_runcmd(self, mock_popen):
         # Given
-        mock_popen.return_value.stdout = StringIO(u'stdout')
-        mock_popen.return_value.stderr = StringIO(u'stderr')
+        mock_popen.return_value.stdout = six.StringIO('stdout')
+        mock_popen.return_value.stderr = six.StringIO('stderr')
         mock_popen.return_value.wait.return_value = 0
 
         # When
@@ -147,7 +143,7 @@ class TestSparkSubmitHook(unittest.TestCase):
         hook.submit()
 
         # Then
-        self.assertEqual(mock_popen.mock_calls[0], call(['spark-submit', '--master', 'yarn', '--name', 'default-name', ''], stdout=-1, stderr=-2))
+        self.assertEqual(mock_popen.mock_calls[0], call(['spark-submit', '--master', 'yarn', '--name', 'default-name', ''], stderr=-2, stdout=-1, universal_newlines=True, bufsize=-1))
 
     def test_resolve_connection_yarn_default(self):
         # Given
@@ -309,11 +305,11 @@ class TestSparkSubmitHook(unittest.TestCase):
 
         self.assertEqual(hook._yarn_application_id, 'application_1486558679801_1820')
 
-    @patch('subprocess.Popen')
+    @patch('airflow.contrib.hooks.spark_submit_hook.subprocess.Popen')
     def test_spark_process_on_kill(self, mock_popen):
         # Given
-        mock_popen.return_value.stdout = StringIO(u'stdout')
-        mock_popen.return_value.stderr = StringIO(u'stderr')
+        mock_popen.return_value.stdout = six.StringIO('stdout')
+        mock_popen.return_value.stderr = six.StringIO('stderr')
         mock_popen.return_value.poll.return_value = None
         mock_popen.return_value.wait.return_value = 0
         log_lines = [