You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/06/07 07:09:57 UTC

incubator-airflow git commit: [AIRFLOW-1192] Some enhancements to qubole_operator

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 6b890d157 -> 6be02475f


[AIRFLOW-1192] Some enhancements to qubole_operator

1. Upgrade qds_sdk version to latest
2. Add support to run Zeppelin Notebooks
3. Move out initialization of QuboleHook from
init()

Closes #2322 from msumit/AIRFLOW-1192


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/6be02475
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/6be02475
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/6be02475

Branch: refs/heads/master
Commit: 6be02475f80c2f493e640272ab5344ca686204a0
Parents: 6b890d1
Author: Sumit Maheshwari <su...@gmail.com>
Authored: Wed Jun 7 09:09:50 2017 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Wed Jun 7 09:09:50 2017 +0200

----------------------------------------------------------------------
 airflow/contrib/hooks/qubole_hook.py            |  4 +-
 airflow/contrib/operators/qubole_operator.py    | 30 ++++---
 scripts/ci/requirements.txt                     |  1 +
 setup.py                                        |  2 +-
 tests/contrib/operators/test_qubole_operator.py | 94 ++++++++++++++++++++
 5 files changed, 115 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6be02475/airflow/contrib/hooks/qubole_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/qubole_hook.py b/airflow/contrib/hooks/qubole_hook.py
index 93db72a..c51a757 100755
--- a/airflow/contrib/hooks/qubole_hook.py
+++ b/airflow/contrib/hooks/qubole_hook.py
@@ -42,7 +42,7 @@ COMMAND_CLASSES = {
     "dbimportcmd": DbImportCommand
 }
 
-HYPHEN_ARGS = ['cluster_label', 'app_id']
+HYPHEN_ARGS = ['cluster_label', 'app_id', 'note_id']
 
 POSITIONAL_ARGS = ['sub_command', 'parameters']
 
@@ -57,7 +57,7 @@ COMMAND_ARGS = {
                'name'],
     'dbtapquerycmd': ['db_tap_id', 'query', 'macros', 'tags', 'name'],
     'sparkcmd': ['program', 'cmdline', 'sql', 'script_location', 'macros', 'tags',
-                 'cluster_label', 'language', 'app_id', 'name', 'arguments',
+                 'cluster_label', 'language', 'app_id', 'name', 'arguments', 'note_id',
                  'user_program_arguments'],
     'dbexportcmd': ['mode', 'hive_table', 'partition_spec', 'dbtap_id', 'db_table',
                     'db_update_mode', 'db_update_keys', 'export_dir',

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6be02475/airflow/contrib/operators/qubole_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/qubole_operator.py b/airflow/contrib/operators/qubole_operator.py
index 623899d..a5e9f5e 100755
--- a/airflow/contrib/operators/qubole_operator.py
+++ b/airflow/contrib/operators/qubole_operator.py
@@ -102,8 +102,9 @@ class QuboleOperator(BaseOperator):
         ``sub_command``, ``script``, ``files``, ``archives``, ``program``, ``cmdline``,
         ``sql``, ``where_clause``, ``extract_query``, ``boundary_query``, ``macros``,
         ``tags``, ``name``, ``parameters``, ``dbtap_id``, ``hive_table``, ``db_table``,
-        ``split_column``, ``db_update_keys``, ``export_dir``, ``partition_spec``. You
-        can also use ``.txt`` files for template driven use cases.
+        ``split_column``, ``note_id``, ``db_update_keys``, ``export_dir``,
+        ``partition_spec``, ``qubole_conn_id``, ``arguments``, ``user_program_arguments``.
+         You can also use ``.txt`` files for template driven use cases.
 
     .. note:: In QuboleOperator there is a default handler for task failures and retries,
         which generally kills the command running at QDS for the corresponding task
@@ -114,8 +115,10 @@ class QuboleOperator(BaseOperator):
     template_fields = ('query', 'script_location', 'sub_command', 'script', 'files',
                        'archives', 'program', 'cmdline', 'sql', 'where_clause', 'tags',
                        'extract_query', 'boundary_query', 'macros', 'name', 'parameters',
-                       'dbtap_id', 'hive_table', 'db_table', 'split_column',
-                       'db_update_keys', 'export_dir', 'partition_spec')
+                       'dbtap_id', 'hive_table', 'db_table', 'split_column', 'note_id',
+                       'db_update_keys', 'export_dir', 'partition_spec', 'qubole_conn_id',
+                       'arguments', 'user_program_arguments')
+
     template_ext = ('.txt',)
     ui_color = '#3064A1'
     ui_fgcolor = '#fff'
@@ -125,7 +128,6 @@ class QuboleOperator(BaseOperator):
         self.args = args
         self.kwargs = kwargs
         self.kwargs['qubole_conn_id'] = qubole_conn_id
-        self.hook = QuboleHook(*self.args, **self.kwargs)
         super(QuboleOperator, self).__init__(*args, **kwargs)
 
         if self.on_failure_callback is None:
@@ -135,21 +137,23 @@ class QuboleOperator(BaseOperator):
             self.on_retry_callback = QuboleHook.handle_failure_retry
 
     def execute(self, context):
-        # Reinitiating the hook, as some template fields might have changed
-        self.hook = QuboleHook(*self.args, **self.kwargs)
-        return self.hook.execute(context)
+        return self.get_hook().execute(context)
 
-    def on_kill(self, ti):
-        self.hook.kill(ti)
+    def on_kill(self, ti=None):
+        self.get_hook().kill(ti)
 
     def get_results(self, ti=None, fp=None, inline=True, delim=None, fetch=True):
-        return self.hook.get_results(ti, fp, inline, delim, fetch)
+        return self.get_hook().get_results(ti, fp, inline, delim, fetch)
 
     def get_log(self, ti):
-        return self.hook.get_log(ti)
+        return self.get_hook().get_log(ti)
 
     def get_jobs_id(self, ti):
-        return self.hook.get_jobs_id(ti)
+        return self.get_hook().get_jobs_id(ti)
+
+    def get_hook(self):
+        # Reinitiating the hook, as some template fields might have changed
+        return QuboleHook(*self.args, **self.kwargs)
 
     def __getattribute__(self, name):
         if name in QuboleOperator.template_fields:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6be02475/scripts/ci/requirements.txt
----------------------------------------------------------------------
diff --git a/scripts/ci/requirements.txt b/scripts/ci/requirements.txt
index bf2d386..5b0612e 100644
--- a/scripts/ci/requirements.txt
+++ b/scripts/ci/requirements.txt
@@ -73,6 +73,7 @@ PyOpenSSL
 PySmbClient
 python-daemon
 python-dateutil
+qds-sdk>=1.9.6
 redis
 rednose
 requests

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6be02475/setup.py
----------------------------------------------------------------------
diff --git a/setup.py b/setup.py
index 71c3f49..527ee5d 100644
--- a/setup.py
+++ b/setup.py
@@ -179,7 +179,7 @@ password = [
     'flask-bcrypt>=0.7.1',
 ]
 github_enterprise = ['Flask-OAuthlib>=0.9.1']
-qds = ['qds-sdk>=1.9.0']
+qds = ['qds-sdk>=1.9.6']
 cloudant = ['cloudant>=0.5.9,<2.0'] # major update coming soon, clamp to 0.x
 redis = ['redis>=2.10.5']
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6be02475/tests/contrib/operators/test_qubole_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_qubole_operator.py b/tests/contrib/operators/test_qubole_operator.py
new file mode 100644
index 0000000..0e6e13d
--- /dev/null
+++ b/tests/contrib/operators/test_qubole_operator.py
@@ -0,0 +1,94 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import unittest
+from datetime import datetime
+
+from airflow.models import DAG, Connection
+from airflow.utils import db
+
+from airflow.contrib.hooks.qubole_hook import QuboleHook
+from airflow.contrib.operators.qubole_operator import QuboleOperator
+
+try:
+    from unittest import mock
+except ImportError:
+    try:
+        import mock
+    except ImportError:
+        mock = None
+
+DAG_ID="qubole_test_dag"
+TASK_ID="test_task"
+DEFAULT_CONN="qubole_default"
+TEMPLATE_CONN = "my_conn_id"
+DEFAULT_DATE = datetime(2017, 1, 1)
+
+
+class QuboleOperatorTest(unittest.TestCase):
+    def setUp(self):
+        db.merge_conn(
+            Connection(conn_id=DEFAULT_CONN, conn_type='HTTP'))
+
+    def test_init_with_default_connection(self):
+        op = QuboleOperator(task_id=TASK_ID)
+        self.assertEqual(op.task_id, TASK_ID)
+        self.assertEqual(op.qubole_conn_id, DEFAULT_CONN)
+
+    def test_init_with_template_connection(self):
+        dag = DAG(DAG_ID, start_date=DEFAULT_DATE)
+
+        with dag:
+            task = QuboleOperator(task_id=TASK_ID, dag=dag,
+                                  qubole_conn_id="{{ dag_run.conf['qubole_conn_id'] }}")
+
+        result = task.render_template('qubole_conn_id', "{{ qubole_conn_id }}",
+                                      {'qubole_conn_id' : TEMPLATE_CONN})
+        self.assertEqual(task.task_id, TASK_ID)
+        self.assertEqual(result, TEMPLATE_CONN)
+
+    def test_get_hook(self):
+        dag = DAG(DAG_ID, start_date=DEFAULT_DATE)
+
+        with dag:
+            task = QuboleOperator(task_id=TASK_ID, command_type='hivecmd', dag=dag)
+
+        hook = task.get_hook()
+        self.assertEqual(hook.__class__, QuboleHook)
+
+    def test_hyphen_args_note_id(self):
+        dag = DAG(DAG_ID, start_date=DEFAULT_DATE)
+
+        with dag:
+            task = QuboleOperator(task_id=TASK_ID, command_type='sparkcmd',
+                                  note_id="123", dag=dag)
+        self.assertEqual(task.get_hook().create_cmd_args({'run_id':'dummy'})[0],
+                         "--note-id=123")
+
+    def test_position_args_parameters(self):
+        dag = DAG(DAG_ID, start_date=DEFAULT_DATE)
+
+        with dag:
+            task = QuboleOperator(task_id=TASK_ID, command_type='pigcmd',
+                          parameters="key1=value1 key2=value2", dag=dag)
+
+        self.assertEqual(task.get_hook().create_cmd_args({'run_id':'dummy'})[1],
+                         "key1=value1")
+        self.assertEqual(task.get_hook().create_cmd_args({'run_id':'dummy'})[2],
+                         "key2=value2")
+
+
+
+