You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/09/27 17:17:32 UTC

incubator-airflow git commit: [AIRFLOW-1647] Fix Spark-sql hook

Repository: incubator-airflow
Updated Branches:
  refs/heads/master dca9ab3f8 -> 6a9dc8ad1


[AIRFLOW-1647] Fix Spark-sql hook

The logging in the spark-sql hook was not working
and causing an
exception because the _process_log is not
available. Write to
the log handler directly because the spark-sql
hook cant run in
yarn-cluster mode. Write tests which verify the
popen call of the
hook.

Closes #2637 from Fokko/AIRFLOW-1647-Fix-spark-
sql-hook


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/6a9dc8ad
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/6a9dc8ad
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/6a9dc8ad

Branch: refs/heads/master
Commit: 6a9dc8ad183b906aa92f4a7197e078632c6e0ba6
Parents: dca9ab3
Author: Fokko Driesprong <fo...@godatadriven.com>
Authored: Wed Sep 27 19:17:26 2017 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Wed Sep 27 19:17:26 2017 +0200

----------------------------------------------------------------------
 airflow/contrib/hooks/spark_sql_hook.py    |  5 ++--
 tests/contrib/hooks/test_spark_sql_hook.py | 33 ++++++++++++++++++++++++-
 2 files changed, 35 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6a9dc8ad/airflow/contrib/hooks/spark_sql_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/spark_sql_hook.py b/airflow/contrib/hooks/spark_sql_hook.py
index 6973023..ea3ee8b 100644
--- a/airflow/contrib/hooks/spark_sql_hook.py
+++ b/airflow/contrib/hooks/spark_sql_hook.py
@@ -19,7 +19,7 @@ from airflow.exceptions import AirflowException
 from airflow.utils.log.logging_mixin import LoggingMixin
 
 
-class SparkSqlHook(BaseHook, LoggingMixin):
+class SparkSqlHook(BaseHook):
     """
     This hook is a wrapper around the spark-sql binary. It requires that the
     "spark-sql" binary is in the PATH.
@@ -138,7 +138,8 @@ class SparkSqlHook(BaseHook, LoggingMixin):
                                     stderr=subprocess.STDOUT,
                                     **kwargs)
 
-        self._process_log(iter(self._sp.stdout.readline, b''))
+        for line in iter(self._sp.stdout.readline, ''):
+            self.log.info(line)
 
         returncode = self._sp.wait()
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6a9dc8ad/tests/contrib/hooks/test_spark_sql_hook.py
----------------------------------------------------------------------
diff --git a/tests/contrib/hooks/test_spark_sql_hook.py b/tests/contrib/hooks/test_spark_sql_hook.py
index 145892c..720cf2e 100644
--- a/tests/contrib/hooks/test_spark_sql_hook.py
+++ b/tests/contrib/hooks/test_spark_sql_hook.py
@@ -12,12 +12,14 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+
+import six
 import sys
 import unittest
 from io import StringIO
 from itertools import dropwhile
 
-import mock
+from mock import patch, call
 
 from airflow import configuration, models
 from airflow.utils import db
@@ -76,6 +78,35 @@ class TestSparkSqlHook(unittest.TestCase):
         if self._config['verbose']:
             assert "--verbose" in cmd
 
+    @patch('airflow.contrib.hooks.spark_sql_hook.subprocess.Popen')
+    def test_spark_process_runcmd(self, mock_popen):
+        # Given
+        mock_popen.return_value.stdout = six.StringIO('Spark-sql communicates using stdout')
+        mock_popen.return_value.stderr = six.StringIO('stderr')
+        mock_popen.return_value.wait.return_value = 0
+
+        # When
+        hook = SparkSqlHook(
+            conn_id='spark_default',
+            sql='SELECT 1'
+        )
+        with patch.object(hook.log, 'debug') as mock_debug:
+            with patch.object(hook.log, 'info') as mock_info:
+                hook.run_query()
+                mock_debug.assert_called_with(
+                    'Spark-Sql cmd: %s',
+                    ['spark-sql', '-e', 'SELECT 1', '--master', 'yarn', '--name', 'default-name', '--verbose', '--queue', 'default']
+                )
+                mock_info.assert_called_with(
+                    'Spark-sql communicates using stdout'
+                )
+
+        # Then
+        self.assertEqual(
+            mock_popen.mock_calls[0],
+            call(['spark-sql', '-e', 'SELECT 1', '--master', 'yarn', '--name', 'default-name', '--verbose', '--queue', 'default'], stderr=-2, stdout=-1)
+        )
+
 
 if __name__ == '__main__':
     unittest.main()