You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2018/01/12 10:01:19 UTC

incubator-airflow git commit: [AIRFLOW-1770] Allow HiveOperator to take in a file

Repository: incubator-airflow
Updated Branches:
  refs/heads/master c208a41fc -> eb994d683


[AIRFLOW-1770] Allow HiveOperator to take in a file

Clarify and upgrade HiveOperator. Include
description of hql parameter being able to
take in a relative path from the dag file
of a hive script, templated or not. Add
ability to template hiveconf variables. Add
default value to the map reduce job name as
well as add updated hiveconf var for queue.

Closes #2752 from wolfier/AIRFLOW-1770


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/eb994d68
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/eb994d68
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/eb994d68

Branch: refs/heads/master
Commit: eb994d683f244f63dd191a6640baaee66ffc8e29
Parents: c208a41
Author: Alan Ma <am...@pandora.com>
Authored: Fri Jan 12 11:01:11 2018 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Fri Jan 12 11:01:11 2018 +0100

----------------------------------------------------------------------
 airflow/hooks/hive_hooks.py        |  9 ++++++++-
 airflow/operators/hive_operator.py | 23 +++++++++++++++++------
 tests/operators/hive_operator.py   | 11 +++++++++++
 3 files changed, 36 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb994d68/airflow/hooks/hive_hooks.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/hive_hooks.py b/airflow/hooks/hive_hooks.py
index 3d986fd..47aebc8 100644
--- a/airflow/hooks/hive_hooks.py
+++ b/airflow/hooks/hive_hooks.py
@@ -183,7 +183,14 @@ class HiveCliHook(BaseHook):
                     hive_conf_params.extend(
                         ['-hiveconf',
                          'mapreduce.job.queuename={}'
-                         .format(self.mapred_queue)])
+                         .format(self.mapred_queue),
+                         '-hiveconf',
+                         'mapred.job.queue.name={}'
+                         .format(self.mapred_queue),
+                         '-hiveconf',
+                         'tez.job.queue.name={}'
+                         .format(self.mapred_queue)
+                         ])
 
                 if self.mapred_queue_priority:
                     hive_conf_params.extend(

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb994d68/airflow/operators/hive_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/hive_operator.py b/airflow/operators/hive_operator.py
index ce98544..ffb98ac 100644
--- a/airflow/operators/hive_operator.py
+++ b/airflow/operators/hive_operator.py
@@ -21,15 +21,17 @@ from airflow.utils.operator_helpers import context_to_airflow_vars
 
 class HiveOperator(BaseOperator):
     """
-    Executes hql code in a specific Hive database.
+    Executes hql code or hive script in a specific Hive database.
 
-    :param hql: the hql to be executed
+    :param hql: the hql to be executed. Note that you may also use
+        a relative path from the dag file of a (template) hive script.
     :type hql: string
     :param hive_cli_conn_id: reference to the Hive database
     :type hive_cli_conn_id: string
     :param hiveconf_jinja_translate: when True, hiveconf-type templating
-        ${var} gets translated into jinja-type templating {{ var }}. Note that
-        you may want to use this along with the
+        ${var} gets translated into jinja-type templating {{ var }} and
+        ${hiveconf:var} gets translated into jinja-type templating {{ var }}.
+        Note that you may want to use this along with the
         ``DAG(user_defined_macros=myargs)`` parameter. View the DAG
         object documentation for more details.
     :type hiveconf_jinja_translate: boolean
@@ -46,7 +48,8 @@ class HiveOperator(BaseOperator):
     :type  mapred_job_name: string
     """
 
-    template_fields = ('hql', 'schema')
+    template_fields = ('hql', 'schema', 'hive_cli_conn_id', 'mapred_queue',
+                       'mapred_job_name', 'mapred_queue_priority')
     template_ext = ('.hql', '.sql',)
     ui_color = '#f0e4ec'
 
@@ -94,13 +97,21 @@ class HiveOperator(BaseOperator):
     def prepare_template(self):
         if self.hiveconf_jinja_translate:
             self.hql = re.sub(
-                "(\$\{([ a-zA-Z0-9_]*)\})", "{{ \g<2> }}", self.hql)
+                "(\$\{(hiveconf:)?([ a-zA-Z0-9_]*)\})", "{{ \g<3> }}", self.hql)
         if self.script_begin_tag and self.script_begin_tag in self.hql:
             self.hql = "\n".join(self.hql.split(self.script_begin_tag)[1:])
 
     def execute(self, context):
         self.log.info('Executing: %s', self.hql)
         self.hook = self.get_hook()
+
+        # set the mapred_job_name if it's not set with dag, task, execution time info
+        if not self.mapred_job_name:
+            ti = context['ti']
+            self.hook.mapred_job_name = 'Airflow HiveOperator task for {}.{}.{}.{}'\
+                .format(ti.hostname.split('.')[0], ti.dag_id, ti.task_id,
+                        ti.execution_date.isoformat())
+
         self.hook.run_cli(hql=self.hql, schema=self.schema,
                           hive_conf=context_to_airflow_vars(context))
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb994d68/tests/operators/hive_operator.py
----------------------------------------------------------------------
diff --git a/tests/operators/hive_operator.py b/tests/operators/hive_operator.py
index 279c7ba..e547d4a 100644
--- a/tests/operators/hive_operator.py
+++ b/tests/operators/hive_operator.py
@@ -81,6 +81,17 @@ class HiveOperatorConfigTest(HiveEnvironmentTest):
         self.assertEqual(t.get_hook().mapred_queue, specific_mapred_queue)
 
 
+class HiveOperatorTest(HiveEnvironmentTest):
+
+    def test_hiveconf_jinja_translate(self):
+        hql = "SELECT ${num_col} FROM ${hiveconf:table};"
+        t = operators.hive_operator.HiveOperator(
+            hiveconf_jinja_translate=True,
+            task_id='dry_run_basic_hql', hql=hql, dag=self.dag)
+        t.prepare_template()
+        self.assertEqual(t.hql, "SELECT {{ num_col }} FROM {{ table }};")
+
+
 if 'AIRFLOW_RUNALL_TESTS' in os.environ:
 
     import airflow.hooks.hive_hooks