You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by da...@apache.org on 2018/01/03 22:23:33 UTC
incubator-airflow git commit: [AIRFLOW-1963] Add config for
HiveOperator mapred_queue
Repository: incubator-airflow
Updated Branches:
refs/heads/master 07c2a515e -> b3489b99e
[AIRFLOW-1963] Add config for HiveOperator mapred_queue
Adding configuration setting for specifying a
default mapred_queue for
hive jobs using the HiveOperator.
Closes #2915 from edgarRd/erod-hive-mapred-queue-
config
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b3489b99
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b3489b99
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b3489b99
Branch: refs/heads/master
Commit: b3489b99e9140e25bdd08b78f57ba845c3edb358
Parents: 07c2a51
Author: Edgar Rodriguez <ed...@airbnb.com>
Authored: Wed Jan 3 14:23:09 2018 -0800
Committer: Dan Davydov <da...@airbnb.com>
Committed: Wed Jan 3 14:23:14 2018 -0800
----------------------------------------------------------------------
airflow/config_templates/default_airflow.cfg | 4 +-
airflow/config_templates/default_test.cfg | 3 +
airflow/hooks/hive_hooks.py | 4 +-
airflow/operators/hive_operator.py | 19 ++++--
scripts/ci/airflow_travis.cfg | 3 +
tests/operators/hive_operator.py | 82 ++++++++++++++---------
6 files changed, 77 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b3489b99/airflow/config_templates/default_airflow.cfg
----------------------------------------------------------------------
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index a7a3b7d..d0dfb72 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -170,6 +170,9 @@ default_ram = 512
default_disk = 512
default_gpus = 0
+[hive]
+# Default mapreduce queue for HiveOperator tasks
+default_hive_mapred_queue =
[webserver]
# The base url of your website as airflow cannot guess what domain or
@@ -458,7 +461,6 @@ keytab = airflow.keytab
[github_enterprise]
api_rev = v3
-
[admin]
# UI to hide sensitive variable fields when set to True
hide_sensitive_variable_fields = True
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b3489b99/airflow/config_templates/default_test.cfg
----------------------------------------------------------------------
diff --git a/airflow/config_templates/default_test.cfg b/airflow/config_templates/default_test.cfg
index 85343ee..eaf3d03 100644
--- a/airflow/config_templates/default_test.cfg
+++ b/airflow/config_templates/default_test.cfg
@@ -51,6 +51,9 @@ auth_backend = airflow.api.auth.backend.default
[operators]
default_owner = airflow
+[hive]
+default_hive_mapred_queue = airflow
+
[webserver]
base_url = http://localhost:8080
web_server_host = 0.0.0.0
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b3489b99/airflow/hooks/hive_hooks.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/hive_hooks.py b/airflow/hooks/hive_hooks.py
index eb39469..3d986fd 100644
--- a/airflow/hooks/hive_hooks.py
+++ b/airflow/hooks/hive_hooks.py
@@ -25,6 +25,7 @@ import time
from tempfile import NamedTemporaryFile
import hive_metastore
+from airflow import configuration as conf
from airflow.exceptions import AirflowException
from airflow.hooks.base_hook import BaseHook
from airflow.utils.helpers import as_flattened_list
@@ -82,7 +83,8 @@ class HiveCliHook(BaseHook):
"Invalid Mapred Queue Priority. Valid values are: "
"{}".format(', '.join(HIVE_QUEUE_PRIORITIES)))
- self.mapred_queue = mapred_queue
+ self.mapred_queue = mapred_queue or conf.get('hive',
+ 'default_hive_mapred_queue')
self.mapred_queue_priority = mapred_queue_priority
self.mapred_job_name = mapred_job_name
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b3489b99/airflow/operators/hive_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/hive_operator.py b/airflow/operators/hive_operator.py
index 221feeb..ce98544 100644
--- a/airflow/operators/hive_operator.py
+++ b/airflow/operators/hive_operator.py
@@ -77,13 +77,19 @@ class HiveOperator(BaseOperator):
self.mapred_queue_priority = mapred_queue_priority
self.mapred_job_name = mapred_job_name
+ # assigned lazily - just for consistency we can create the attribute with a
+ # `None` initial value, later it will be populated by the execute method.
+ # This also makes `on_kill` implementation consistent since it assumes `self.hook`
+ # is defined.
+ self.hook = None
+
def get_hook(self):
return HiveCliHook(
- hive_cli_conn_id=self.hive_cli_conn_id,
- run_as=self.run_as,
- mapred_queue=self.mapred_queue,
- mapred_queue_priority=self.mapred_queue_priority,
- mapred_job_name=self.mapred_job_name)
+ hive_cli_conn_id=self.hive_cli_conn_id,
+ run_as=self.run_as,
+ mapred_queue=self.mapred_queue,
+ mapred_queue_priority=self.mapred_queue_priority,
+ mapred_job_name=self.mapred_job_name)
def prepare_template(self):
if self.hiveconf_jinja_translate:
@@ -103,4 +109,5 @@ class HiveOperator(BaseOperator):
self.hook.test_hql(hql=self.hql)
def on_kill(self):
- self.hook.kill()
+ if self.hook:
+ self.hook.kill()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b3489b99/scripts/ci/airflow_travis.cfg
----------------------------------------------------------------------
diff --git a/scripts/ci/airflow_travis.cfg b/scripts/ci/airflow_travis.cfg
index c1ced74..03d7e59 100644
--- a/scripts/ci/airflow_travis.cfg
+++ b/scripts/ci/airflow_travis.cfg
@@ -31,6 +31,9 @@ base_url = http://localhost:8080
web_server_host = 0.0.0.0
web_server_port = 8080
+[hive]
+default_hive_mapred_queue = airflow
+
[email]
email_backend = airflow.utils.send_email_smtp
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b3489b99/tests/operators/hive_operator.py
----------------------------------------------------------------------
diff --git a/tests/operators/hive_operator.py b/tests/operators/hive_operator.py
index 69166fd..279c7ba 100644
--- a/tests/operators/hive_operator.py
+++ b/tests/operators/hive_operator.py
@@ -30,6 +30,57 @@ DEFAULT_DATE_ISO = DEFAULT_DATE.isoformat()
DEFAULT_DATE_DS = DEFAULT_DATE_ISO[:10]
+class HiveEnvironmentTest(unittest.TestCase):
+
+ def setUp(self):
+ configuration.load_test_config()
+ args = {'owner': 'airflow', 'start_date': DEFAULT_DATE}
+ dag = DAG('test_dag_id', default_args=args)
+ self.dag = dag
+ self.hql = """
+ USE airflow;
+ DROP TABLE IF EXISTS static_babynames_partitioned;
+ CREATE TABLE IF NOT EXISTS static_babynames_partitioned (
+ state string,
+ year string,
+ name string,
+ gender string,
+ num int)
+ PARTITIONED BY (ds string);
+ INSERT OVERWRITE TABLE static_babynames_partitioned
+ PARTITION(ds='{{ ds }}')
+ SELECT state, year, name, gender, num FROM static_babynames;
+ """
+
+
+class HiveOperatorConfigTest(HiveEnvironmentTest):
+
+ def test_hive_airflow_default_config_queue(self):
+ t = operators.hive_operator.HiveOperator(
+ task_id='test_default_config_queue',
+ hql=self.hql,
+ mapred_queue_priority='HIGH',
+ mapred_job_name='airflow.test_default_config_queue',
+ dag=self.dag)
+
+ # just check that the correct default value in test_default.cfg is used
+ test_config_hive_mapred_queue = configuration.get('hive',
+ 'default_hive_mapred_queue')
+ self.assertEqual(t.get_hook().mapred_queue, test_config_hive_mapred_queue)
+
+ def test_hive_airflow_default_config_queue_override(self):
+ specific_mapred_queue = 'default'
+ t = operators.hive_operator.HiveOperator(
+ task_id='test_default_config_queue',
+ hql=self.hql,
+ mapred_queue=specific_mapred_queue,
+ mapred_queue_priority='HIGH',
+ mapred_job_name='airflow.test_default_config_queue',
+ dag=self.dag)
+
+ self.assertEqual(t.get_hook().mapred_queue, specific_mapred_queue)
+
+
if 'AIRFLOW_RUNALL_TESTS' in os.environ:
import airflow.hooks.hive_hooks
@@ -148,37 +199,15 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
self.assertEqual(sql, args[0])
self.assertEqual(self.nondefault_schema, kwargs['schema'])
- class HivePrestoTest(unittest.TestCase):
-
- def setUp(self):
- configuration.load_test_config()
- args = {'owner': 'airflow', 'start_date': DEFAULT_DATE}
- dag = DAG('test_dag_id', default_args=args)
- self.dag = dag
- self.hql = """
- USE airflow;
- DROP TABLE IF EXISTS static_babynames_partitioned;
- CREATE TABLE IF NOT EXISTS static_babynames_partitioned (
- state string,
- year string,
- name string,
- gender string,
- num int)
- PARTITIONED BY (ds string);
- INSERT OVERWRITE TABLE static_babynames_partitioned
- PARTITION(ds='{{ ds }}')
- SELECT state, year, name, gender, num FROM static_babynames;
- """
+ class HivePrestoTest(HiveEnvironmentTest):
def test_hive(self):
- import airflow.operators.hive_operator
t = operators.hive_operator.HiveOperator(
task_id='basic_hql', hql=self.hql, dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
ignore_ti_state=True)
def test_hive_queues(self):
- import airflow.operators.hive_operator
t = operators.hive_operator.HiveOperator(
task_id='test_hive_queues', hql=self.hql,
mapred_queue='default', mapred_queue_priority='HIGH',
@@ -188,13 +217,11 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
ignore_ti_state=True)
def test_hive_dryrun(self):
- import airflow.operators.hive_operator
t = operators.hive_operator.HiveOperator(
task_id='dry_run_basic_hql', hql=self.hql, dag=self.dag)
t.dry_run()
def test_beeline(self):
- import airflow.operators.hive_operator
t = operators.hive_operator.HiveOperator(
task_id='beeline_hql', hive_cli_conn_id='beeline_default',
hql=self.hql, dag=self.dag)
@@ -205,14 +232,12 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
sql = """
SELECT count(1) FROM airflow.static_babynames_partitioned;
"""
- import airflow.operators.presto_check_operator
t = operators.presto_check_operator.PrestoCheckOperator(
task_id='presto_check', sql=sql, dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
ignore_ti_state=True)
def test_presto_to_mysql(self):
- import airflow.operators.presto_to_mysql
t = operators.presto_to_mysql.PrestoToMySqlTransfer(
task_id='presto_to_mysql_check',
sql="""
@@ -253,7 +278,6 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
ignore_ti_state=True)
def test_hive_stats(self):
- import airflow.operators.hive_stats_operator
t = operators.hive_stats_operator.HiveStatsCollectionOperator(
task_id='hive_stats_check',
table="airflow.static_babynames_partitioned",
@@ -322,7 +346,6 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
ignore_ti_state=True)
def test_hive2samba(self):
- import airflow.operators.hive_to_samba_operator
t = operators.hive_to_samba_operator.Hive2SambaOperator(
task_id='hive2samba_check',
samba_conn_id='tableau_samba',
@@ -333,7 +356,6 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
ignore_ti_state=True)
def test_hive_to_mysql(self):
- import airflow.operators.hive_to_mysql
t = operators.hive_to_mysql.HiveToMySqlTransfer(
mysql_conn_id='airflow_db',
task_id='hive_to_mysql_check',