You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2018/01/12 10:01:19 UTC
incubator-airflow git commit: [AIRFLOW-1770] Allow HiveOperator to
take in a file
Repository: incubator-airflow
Updated Branches:
refs/heads/master c208a41fc -> eb994d683
[AIRFLOW-1770] Allow HiveOperator to take in a file
Clarify and upgrade HiveOperator. Include
description of hql parameter being able to
take in a relative path from the dag file
of a hive script, templated or not. Add
ability to template hiveconf variables. Add
default value to the map reduce job name as
well as add updated hiveconf var for queue.
Closes #2752 from wolfier/AIRFLOW-1770
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/eb994d68
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/eb994d68
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/eb994d68
Branch: refs/heads/master
Commit: eb994d683f244f63dd191a6640baaee66ffc8e29
Parents: c208a41
Author: Alan Ma <am...@pandora.com>
Authored: Fri Jan 12 11:01:11 2018 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Fri Jan 12 11:01:11 2018 +0100
----------------------------------------------------------------------
airflow/hooks/hive_hooks.py | 9 ++++++++-
airflow/operators/hive_operator.py | 23 +++++++++++++++++------
tests/operators/hive_operator.py | 11 +++++++++++
3 files changed, 36 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb994d68/airflow/hooks/hive_hooks.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/hive_hooks.py b/airflow/hooks/hive_hooks.py
index 3d986fd..47aebc8 100644
--- a/airflow/hooks/hive_hooks.py
+++ b/airflow/hooks/hive_hooks.py
@@ -183,7 +183,14 @@ class HiveCliHook(BaseHook):
hive_conf_params.extend(
['-hiveconf',
'mapreduce.job.queuename={}'
- .format(self.mapred_queue)])
+ .format(self.mapred_queue),
+ '-hiveconf',
+ 'mapred.job.queue.name={}'
+ .format(self.mapred_queue),
+ '-hiveconf',
+ 'tez.job.queue.name={}'
+ .format(self.mapred_queue)
+ ])
if self.mapred_queue_priority:
hive_conf_params.extend(
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb994d68/airflow/operators/hive_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/hive_operator.py b/airflow/operators/hive_operator.py
index ce98544..ffb98ac 100644
--- a/airflow/operators/hive_operator.py
+++ b/airflow/operators/hive_operator.py
@@ -21,15 +21,17 @@ from airflow.utils.operator_helpers import context_to_airflow_vars
class HiveOperator(BaseOperator):
"""
- Executes hql code in a specific Hive database.
+ Executes hql code or hive script in a specific Hive database.
- :param hql: the hql to be executed
+ :param hql: the hql to be executed. Note that you may also use
+ a relative path from the dag file of a (template) hive script.
:type hql: string
:param hive_cli_conn_id: reference to the Hive database
:type hive_cli_conn_id: string
:param hiveconf_jinja_translate: when True, hiveconf-type templating
- ${var} gets translated into jinja-type templating {{ var }}. Note that
- you may want to use this along with the
+ ${var} gets translated into jinja-type templating {{ var }} and
+ ${hiveconf:var} gets translated into jinja-type templating {{ var }}.
+ Note that you may want to use this along with the
``DAG(user_defined_macros=myargs)`` parameter. View the DAG
object documentation for more details.
:type hiveconf_jinja_translate: boolean
@@ -46,7 +48,8 @@ class HiveOperator(BaseOperator):
:type mapred_job_name: string
"""
- template_fields = ('hql', 'schema')
+ template_fields = ('hql', 'schema', 'hive_cli_conn_id', 'mapred_queue',
+ 'mapred_job_name', 'mapred_queue_priority')
template_ext = ('.hql', '.sql',)
ui_color = '#f0e4ec'
@@ -94,13 +97,21 @@ class HiveOperator(BaseOperator):
def prepare_template(self):
if self.hiveconf_jinja_translate:
self.hql = re.sub(
- "(\$\{([ a-zA-Z0-9_]*)\})", "{{ \g<2> }}", self.hql)
+ "(\$\{(hiveconf:)?([ a-zA-Z0-9_]*)\})", "{{ \g<3> }}", self.hql)
if self.script_begin_tag and self.script_begin_tag in self.hql:
self.hql = "\n".join(self.hql.split(self.script_begin_tag)[1:])
def execute(self, context):
self.log.info('Executing: %s', self.hql)
self.hook = self.get_hook()
+
+ # set the mapred_job_name if it's not set with dag, task, execution time info
+ if not self.mapred_job_name:
+ ti = context['ti']
+ self.hook.mapred_job_name = 'Airflow HiveOperator task for {}.{}.{}.{}'\
+ .format(ti.hostname.split('.')[0], ti.dag_id, ti.task_id,
+ ti.execution_date.isoformat())
+
self.hook.run_cli(hql=self.hql, schema=self.schema,
hive_conf=context_to_airflow_vars(context))
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb994d68/tests/operators/hive_operator.py
----------------------------------------------------------------------
diff --git a/tests/operators/hive_operator.py b/tests/operators/hive_operator.py
index 279c7ba..e547d4a 100644
--- a/tests/operators/hive_operator.py
+++ b/tests/operators/hive_operator.py
@@ -81,6 +81,17 @@ class HiveOperatorConfigTest(HiveEnvironmentTest):
self.assertEqual(t.get_hook().mapred_queue, specific_mapred_queue)
+class HiveOperatorTest(HiveEnvironmentTest):
+
+ def test_hiveconf_jinja_translate(self):
+ hql = "SELECT ${num_col} FROM ${hiveconf:table};"
+ t = operators.hive_operator.HiveOperator(
+ hiveconf_jinja_translate=True,
+ task_id='dry_run_basic_hql', hql=hql, dag=self.dag)
+ t.prepare_template()
+ self.assertEqual(t.hql, "SELECT {{ num_col }} FROM {{ table }};")
+
+
if 'AIRFLOW_RUNALL_TESTS' in os.environ:
import airflow.hooks.hive_hooks