You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/09/27 17:17:32 UTC
incubator-airflow git commit: [AIRFLOW-1647] Fix Spark-sql hook
Repository: incubator-airflow
Updated Branches:
refs/heads/master dca9ab3f8 -> 6a9dc8ad1
[AIRFLOW-1647] Fix Spark-sql hook
The logging in the spark-sql hook was not working
and causing an
exception because the _process_log is not
available. Write to
the log handler directly because the spark-sql
hook cant run in
yarn-cluster mode. Write tests which verify the
popen call of the
hook.
Closes #2637 from Fokko/AIRFLOW-1647-Fix-spark-
sql-hook
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/6a9dc8ad
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/6a9dc8ad
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/6a9dc8ad
Branch: refs/heads/master
Commit: 6a9dc8ad183b906aa92f4a7197e078632c6e0ba6
Parents: dca9ab3
Author: Fokko Driesprong <fo...@godatadriven.com>
Authored: Wed Sep 27 19:17:26 2017 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Wed Sep 27 19:17:26 2017 +0200
----------------------------------------------------------------------
airflow/contrib/hooks/spark_sql_hook.py | 5 ++--
tests/contrib/hooks/test_spark_sql_hook.py | 33 ++++++++++++++++++++++++-
2 files changed, 35 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6a9dc8ad/airflow/contrib/hooks/spark_sql_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/spark_sql_hook.py b/airflow/contrib/hooks/spark_sql_hook.py
index 6973023..ea3ee8b 100644
--- a/airflow/contrib/hooks/spark_sql_hook.py
+++ b/airflow/contrib/hooks/spark_sql_hook.py
@@ -19,7 +19,7 @@ from airflow.exceptions import AirflowException
from airflow.utils.log.logging_mixin import LoggingMixin
-class SparkSqlHook(BaseHook, LoggingMixin):
+class SparkSqlHook(BaseHook):
"""
This hook is a wrapper around the spark-sql binary. It requires that the
"spark-sql" binary is in the PATH.
@@ -138,7 +138,8 @@ class SparkSqlHook(BaseHook, LoggingMixin):
stderr=subprocess.STDOUT,
**kwargs)
- self._process_log(iter(self._sp.stdout.readline, b''))
+ for line in iter(self._sp.stdout.readline, ''):
+ self.log.info(line)
returncode = self._sp.wait()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6a9dc8ad/tests/contrib/hooks/test_spark_sql_hook.py
----------------------------------------------------------------------
diff --git a/tests/contrib/hooks/test_spark_sql_hook.py b/tests/contrib/hooks/test_spark_sql_hook.py
index 145892c..720cf2e 100644
--- a/tests/contrib/hooks/test_spark_sql_hook.py
+++ b/tests/contrib/hooks/test_spark_sql_hook.py
@@ -12,12 +12,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
+
+import six
import sys
import unittest
from io import StringIO
from itertools import dropwhile
-import mock
+from mock import patch, call
from airflow import configuration, models
from airflow.utils import db
@@ -76,6 +78,35 @@ class TestSparkSqlHook(unittest.TestCase):
if self._config['verbose']:
assert "--verbose" in cmd
+ @patch('airflow.contrib.hooks.spark_sql_hook.subprocess.Popen')
+ def test_spark_process_runcmd(self, mock_popen):
+ # Given
+ mock_popen.return_value.stdout = six.StringIO('Spark-sql communicates using stdout')
+ mock_popen.return_value.stderr = six.StringIO('stderr')
+ mock_popen.return_value.wait.return_value = 0
+
+ # When
+ hook = SparkSqlHook(
+ conn_id='spark_default',
+ sql='SELECT 1'
+ )
+ with patch.object(hook.log, 'debug') as mock_debug:
+ with patch.object(hook.log, 'info') as mock_info:
+ hook.run_query()
+ mock_debug.assert_called_with(
+ 'Spark-Sql cmd: %s',
+ ['spark-sql', '-e', 'SELECT 1', '--master', 'yarn', '--name', 'default-name', '--verbose', '--queue', 'default']
+ )
+ mock_info.assert_called_with(
+ 'Spark-sql communicates using stdout'
+ )
+
+ # Then
+ self.assertEqual(
+ mock_popen.mock_calls[0],
+ call(['spark-sql', '-e', 'SELECT 1', '--master', 'yarn', '--name', 'default-name', '--verbose', '--queue', 'default'], stderr=-2, stdout=-1)
+ )
+
if __name__ == '__main__':
unittest.main()