You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/06/07 07:09:57 UTC
incubator-airflow git commit: [AIRFLOW-1192] Some enhancements to
qubole_operator
Repository: incubator-airflow
Updated Branches:
refs/heads/master 6b890d157 -> 6be02475f
[AIRFLOW-1192] Some enhancements to qubole_operator
1. Upgrade qds_sdk version to latest
2. Add support to run Zeppelin Notebooks
3. Move out initialization of QuboleHook from
init()
Closes #2322 from msumit/AIRFLOW-1192
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/6be02475
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/6be02475
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/6be02475
Branch: refs/heads/master
Commit: 6be02475f80c2f493e640272ab5344ca686204a0
Parents: 6b890d1
Author: Sumit Maheshwari <su...@gmail.com>
Authored: Wed Jun 7 09:09:50 2017 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Wed Jun 7 09:09:50 2017 +0200
----------------------------------------------------------------------
airflow/contrib/hooks/qubole_hook.py | 4 +-
airflow/contrib/operators/qubole_operator.py | 30 ++++---
scripts/ci/requirements.txt | 1 +
setup.py | 2 +-
tests/contrib/operators/test_qubole_operator.py | 94 ++++++++++++++++++++
5 files changed, 115 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6be02475/airflow/contrib/hooks/qubole_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/qubole_hook.py b/airflow/contrib/hooks/qubole_hook.py
index 93db72a..c51a757 100755
--- a/airflow/contrib/hooks/qubole_hook.py
+++ b/airflow/contrib/hooks/qubole_hook.py
@@ -42,7 +42,7 @@ COMMAND_CLASSES = {
"dbimportcmd": DbImportCommand
}
-HYPHEN_ARGS = ['cluster_label', 'app_id']
+HYPHEN_ARGS = ['cluster_label', 'app_id', 'note_id']
POSITIONAL_ARGS = ['sub_command', 'parameters']
@@ -57,7 +57,7 @@ COMMAND_ARGS = {
'name'],
'dbtapquerycmd': ['db_tap_id', 'query', 'macros', 'tags', 'name'],
'sparkcmd': ['program', 'cmdline', 'sql', 'script_location', 'macros', 'tags',
- 'cluster_label', 'language', 'app_id', 'name', 'arguments',
+ 'cluster_label', 'language', 'app_id', 'name', 'arguments', 'note_id',
'user_program_arguments'],
'dbexportcmd': ['mode', 'hive_table', 'partition_spec', 'dbtap_id', 'db_table',
'db_update_mode', 'db_update_keys', 'export_dir',
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6be02475/airflow/contrib/operators/qubole_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/qubole_operator.py b/airflow/contrib/operators/qubole_operator.py
index 623899d..a5e9f5e 100755
--- a/airflow/contrib/operators/qubole_operator.py
+++ b/airflow/contrib/operators/qubole_operator.py
@@ -102,8 +102,9 @@ class QuboleOperator(BaseOperator):
``sub_command``, ``script``, ``files``, ``archives``, ``program``, ``cmdline``,
``sql``, ``where_clause``, ``extract_query``, ``boundary_query``, ``macros``,
``tags``, ``name``, ``parameters``, ``dbtap_id``, ``hive_table``, ``db_table``,
- ``split_column``, ``db_update_keys``, ``export_dir``, ``partition_spec``. You
- can also use ``.txt`` files for template driven use cases.
+ ``split_column``, ``note_id``, ``db_update_keys``, ``export_dir``,
+ ``partition_spec``, ``qubole_conn_id``, ``arguments``, ``user_program_arguments``.
+ You can also use ``.txt`` files for template driven use cases.
.. note:: In QuboleOperator there is a default handler for task failures and retries,
which generally kills the command running at QDS for the corresponding task
@@ -114,8 +115,10 @@ class QuboleOperator(BaseOperator):
template_fields = ('query', 'script_location', 'sub_command', 'script', 'files',
'archives', 'program', 'cmdline', 'sql', 'where_clause', 'tags',
'extract_query', 'boundary_query', 'macros', 'name', 'parameters',
- 'dbtap_id', 'hive_table', 'db_table', 'split_column',
- 'db_update_keys', 'export_dir', 'partition_spec')
+ 'dbtap_id', 'hive_table', 'db_table', 'split_column', 'note_id',
+ 'db_update_keys', 'export_dir', 'partition_spec', 'qubole_conn_id',
+ 'arguments', 'user_program_arguments')
+
template_ext = ('.txt',)
ui_color = '#3064A1'
ui_fgcolor = '#fff'
@@ -125,7 +128,6 @@ class QuboleOperator(BaseOperator):
self.args = args
self.kwargs = kwargs
self.kwargs['qubole_conn_id'] = qubole_conn_id
- self.hook = QuboleHook(*self.args, **self.kwargs)
super(QuboleOperator, self).__init__(*args, **kwargs)
if self.on_failure_callback is None:
@@ -135,21 +137,23 @@ class QuboleOperator(BaseOperator):
self.on_retry_callback = QuboleHook.handle_failure_retry
def execute(self, context):
- # Reinitiating the hook, as some template fields might have changed
- self.hook = QuboleHook(*self.args, **self.kwargs)
- return self.hook.execute(context)
+ return self.get_hook().execute(context)
- def on_kill(self, ti):
- self.hook.kill(ti)
+ def on_kill(self, ti=None):
+ self.get_hook().kill(ti)
def get_results(self, ti=None, fp=None, inline=True, delim=None, fetch=True):
- return self.hook.get_results(ti, fp, inline, delim, fetch)
+ return self.get_hook().get_results(ti, fp, inline, delim, fetch)
def get_log(self, ti):
- return self.hook.get_log(ti)
+ return self.get_hook().get_log(ti)
def get_jobs_id(self, ti):
- return self.hook.get_jobs_id(ti)
+ return self.get_hook().get_jobs_id(ti)
+
+ def get_hook(self):
+ # Reinitiating the hook, as some template fields might have changed
+ return QuboleHook(*self.args, **self.kwargs)
def __getattribute__(self, name):
if name in QuboleOperator.template_fields:
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6be02475/scripts/ci/requirements.txt
----------------------------------------------------------------------
diff --git a/scripts/ci/requirements.txt b/scripts/ci/requirements.txt
index bf2d386..5b0612e 100644
--- a/scripts/ci/requirements.txt
+++ b/scripts/ci/requirements.txt
@@ -73,6 +73,7 @@ PyOpenSSL
PySmbClient
python-daemon
python-dateutil
+qds-sdk>=1.9.6
redis
rednose
requests
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6be02475/setup.py
----------------------------------------------------------------------
diff --git a/setup.py b/setup.py
index 71c3f49..527ee5d 100644
--- a/setup.py
+++ b/setup.py
@@ -179,7 +179,7 @@ password = [
'flask-bcrypt>=0.7.1',
]
github_enterprise = ['Flask-OAuthlib>=0.9.1']
-qds = ['qds-sdk>=1.9.0']
+qds = ['qds-sdk>=1.9.6']
cloudant = ['cloudant>=0.5.9,<2.0'] # major update coming soon, clamp to 0.x
redis = ['redis>=2.10.5']
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6be02475/tests/contrib/operators/test_qubole_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_qubole_operator.py b/tests/contrib/operators/test_qubole_operator.py
new file mode 100644
index 0000000..0e6e13d
--- /dev/null
+++ b/tests/contrib/operators/test_qubole_operator.py
@@ -0,0 +1,94 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import unittest
+from datetime import datetime
+
+from airflow.models import DAG, Connection
+from airflow.utils import db
+
+from airflow.contrib.hooks.qubole_hook import QuboleHook
+from airflow.contrib.operators.qubole_operator import QuboleOperator
+
+try:
+ from unittest import mock
+except ImportError:
+ try:
+ import mock
+ except ImportError:
+ mock = None
+
+DAG_ID="qubole_test_dag"
+TASK_ID="test_task"
+DEFAULT_CONN="qubole_default"
+TEMPLATE_CONN = "my_conn_id"
+DEFAULT_DATE = datetime(2017, 1, 1)
+
+
+class QuboleOperatorTest(unittest.TestCase):
+ def setUp(self):
+ db.merge_conn(
+ Connection(conn_id=DEFAULT_CONN, conn_type='HTTP'))
+
+ def test_init_with_default_connection(self):
+ op = QuboleOperator(task_id=TASK_ID)
+ self.assertEqual(op.task_id, TASK_ID)
+ self.assertEqual(op.qubole_conn_id, DEFAULT_CONN)
+
+ def test_init_with_template_connection(self):
+ dag = DAG(DAG_ID, start_date=DEFAULT_DATE)
+
+ with dag:
+ task = QuboleOperator(task_id=TASK_ID, dag=dag,
+ qubole_conn_id="{{ dag_run.conf['qubole_conn_id'] }}")
+
+ result = task.render_template('qubole_conn_id', "{{ qubole_conn_id }}",
+ {'qubole_conn_id' : TEMPLATE_CONN})
+ self.assertEqual(task.task_id, TASK_ID)
+ self.assertEqual(result, TEMPLATE_CONN)
+
+ def test_get_hook(self):
+ dag = DAG(DAG_ID, start_date=DEFAULT_DATE)
+
+ with dag:
+ task = QuboleOperator(task_id=TASK_ID, command_type='hivecmd', dag=dag)
+
+ hook = task.get_hook()
+ self.assertEqual(hook.__class__, QuboleHook)
+
+ def test_hyphen_args_note_id(self):
+ dag = DAG(DAG_ID, start_date=DEFAULT_DATE)
+
+ with dag:
+ task = QuboleOperator(task_id=TASK_ID, command_type='sparkcmd',
+ note_id="123", dag=dag)
+ self.assertEqual(task.get_hook().create_cmd_args({'run_id':'dummy'})[0],
+ "--note-id=123")
+
+ def test_position_args_parameters(self):
+ dag = DAG(DAG_ID, start_date=DEFAULT_DATE)
+
+ with dag:
+ task = QuboleOperator(task_id=TASK_ID, command_type='pigcmd',
+ parameters="key1=value1 key2=value2", dag=dag)
+
+ self.assertEqual(task.get_hook().create_cmd_args({'run_id':'dummy'})[1],
+ "key1=value1")
+ self.assertEqual(task.get_hook().create_cmd_args({'run_id':'dummy'})[2],
+ "key2=value2")
+
+
+
+