You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by da...@apache.org on 2018/01/03 22:23:33 UTC

incubator-airflow git commit: [AIRFLOW-1963] Add config for HiveOperator mapred_queue

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 07c2a515e -> b3489b99e


[AIRFLOW-1963] Add config for HiveOperator mapred_queue

Adding configuration setting for specifying a
default mapred_queue for
hive jobs using the HiveOperator.

Closes #2915 from edgarRd/erod-hive-mapred-queue-
config


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b3489b99
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b3489b99
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b3489b99

Branch: refs/heads/master
Commit: b3489b99e9140e25bdd08b78f57ba845c3edb358
Parents: 07c2a51
Author: Edgar Rodriguez <ed...@airbnb.com>
Authored: Wed Jan 3 14:23:09 2018 -0800
Committer: Dan Davydov <da...@airbnb.com>
Committed: Wed Jan 3 14:23:14 2018 -0800

----------------------------------------------------------------------
 airflow/config_templates/default_airflow.cfg |  4 +-
 airflow/config_templates/default_test.cfg    |  3 +
 airflow/hooks/hive_hooks.py                  |  4 +-
 airflow/operators/hive_operator.py           | 19 ++++--
 scripts/ci/airflow_travis.cfg                |  3 +
 tests/operators/hive_operator.py             | 82 ++++++++++++++---------
 6 files changed, 77 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b3489b99/airflow/config_templates/default_airflow.cfg
----------------------------------------------------------------------
diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg
index a7a3b7d..d0dfb72 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -170,6 +170,9 @@ default_ram = 512
 default_disk = 512
 default_gpus = 0
 
+[hive]
+# Default mapreduce queue for HiveOperator tasks
+default_hive_mapred_queue =
 
 [webserver]
 # The base url of your website as airflow cannot guess what domain or
@@ -458,7 +461,6 @@ keytab = airflow.keytab
 [github_enterprise]
 api_rev = v3
 
-
 [admin]
 # UI to hide sensitive variable fields when set to True
 hide_sensitive_variable_fields = True

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b3489b99/airflow/config_templates/default_test.cfg
----------------------------------------------------------------------
diff --git a/airflow/config_templates/default_test.cfg b/airflow/config_templates/default_test.cfg
index 85343ee..eaf3d03 100644
--- a/airflow/config_templates/default_test.cfg
+++ b/airflow/config_templates/default_test.cfg
@@ -51,6 +51,9 @@ auth_backend = airflow.api.auth.backend.default
 [operators]
 default_owner = airflow
 
+[hive]
+default_hive_mapred_queue = airflow
+
 [webserver]
 base_url = http://localhost:8080
 web_server_host = 0.0.0.0

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b3489b99/airflow/hooks/hive_hooks.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/hive_hooks.py b/airflow/hooks/hive_hooks.py
index eb39469..3d986fd 100644
--- a/airflow/hooks/hive_hooks.py
+++ b/airflow/hooks/hive_hooks.py
@@ -25,6 +25,7 @@ import time
 from tempfile import NamedTemporaryFile
 import hive_metastore
 
+from airflow import configuration as conf
 from airflow.exceptions import AirflowException
 from airflow.hooks.base_hook import BaseHook
 from airflow.utils.helpers import as_flattened_list
@@ -82,7 +83,8 @@ class HiveCliHook(BaseHook):
                     "Invalid Mapred Queue Priority.  Valid values are: "
                     "{}".format(', '.join(HIVE_QUEUE_PRIORITIES)))
 
-        self.mapred_queue = mapred_queue
+        self.mapred_queue = mapred_queue or conf.get('hive',
+                                                     'default_hive_mapred_queue')
         self.mapred_queue_priority = mapred_queue_priority
         self.mapred_job_name = mapred_job_name
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b3489b99/airflow/operators/hive_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/hive_operator.py b/airflow/operators/hive_operator.py
index 221feeb..ce98544 100644
--- a/airflow/operators/hive_operator.py
+++ b/airflow/operators/hive_operator.py
@@ -77,13 +77,19 @@ class HiveOperator(BaseOperator):
         self.mapred_queue_priority = mapred_queue_priority
         self.mapred_job_name = mapred_job_name
 
+        # assigned lazily - just for consistency we can create the attribute with a
+        # `None` initial value, later it will be populated by the execute method.
+        # This also makes `on_kill` implementation consistent since it assumes `self.hook`
+        # is defined.
+        self.hook = None
+
     def get_hook(self):
         return HiveCliHook(
-                        hive_cli_conn_id=self.hive_cli_conn_id,
-                        run_as=self.run_as,
-                        mapred_queue=self.mapred_queue,
-                        mapred_queue_priority=self.mapred_queue_priority,
-                        mapred_job_name=self.mapred_job_name)
+            hive_cli_conn_id=self.hive_cli_conn_id,
+            run_as=self.run_as,
+            mapred_queue=self.mapred_queue,
+            mapred_queue_priority=self.mapred_queue_priority,
+            mapred_job_name=self.mapred_job_name)
 
     def prepare_template(self):
         if self.hiveconf_jinja_translate:
@@ -103,4 +109,5 @@ class HiveOperator(BaseOperator):
         self.hook.test_hql(hql=self.hql)
 
     def on_kill(self):
-        self.hook.kill()
+        if self.hook:
+            self.hook.kill()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b3489b99/scripts/ci/airflow_travis.cfg
----------------------------------------------------------------------
diff --git a/scripts/ci/airflow_travis.cfg b/scripts/ci/airflow_travis.cfg
index c1ced74..03d7e59 100644
--- a/scripts/ci/airflow_travis.cfg
+++ b/scripts/ci/airflow_travis.cfg
@@ -31,6 +31,9 @@ base_url = http://localhost:8080
 web_server_host = 0.0.0.0
 web_server_port = 8080
 
+[hive]
+default_hive_mapred_queue = airflow
+
 [email]
 email_backend = airflow.utils.send_email_smtp
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b3489b99/tests/operators/hive_operator.py
----------------------------------------------------------------------
diff --git a/tests/operators/hive_operator.py b/tests/operators/hive_operator.py
index 69166fd..279c7ba 100644
--- a/tests/operators/hive_operator.py
+++ b/tests/operators/hive_operator.py
@@ -30,6 +30,57 @@ DEFAULT_DATE_ISO = DEFAULT_DATE.isoformat()
 DEFAULT_DATE_DS = DEFAULT_DATE_ISO[:10]
 
 
+class HiveEnvironmentTest(unittest.TestCase):
+
+    def setUp(self):
+        configuration.load_test_config()
+        args = {'owner': 'airflow', 'start_date': DEFAULT_DATE}
+        dag = DAG('test_dag_id', default_args=args)
+        self.dag = dag
+        self.hql = """
+        USE airflow;
+        DROP TABLE IF EXISTS static_babynames_partitioned;
+        CREATE TABLE IF NOT EXISTS static_babynames_partitioned (
+            state string,
+            year string,
+            name string,
+            gender string,
+            num int)
+        PARTITIONED BY (ds string);
+        INSERT OVERWRITE TABLE static_babynames_partitioned
+            PARTITION(ds='{{ ds }}')
+        SELECT state, year, name, gender, num FROM static_babynames;
+        """
+
+
+class HiveOperatorConfigTest(HiveEnvironmentTest):
+
+    def test_hive_airflow_default_config_queue(self):
+        t = operators.hive_operator.HiveOperator(
+            task_id='test_default_config_queue',
+            hql=self.hql,
+            mapred_queue_priority='HIGH',
+            mapred_job_name='airflow.test_default_config_queue',
+            dag=self.dag)
+
+        # just check that the correct default value in test_default.cfg is used
+        test_config_hive_mapred_queue = configuration.get('hive',
+                                                          'default_hive_mapred_queue')
+        self.assertEqual(t.get_hook().mapred_queue, test_config_hive_mapred_queue)
+
+    def test_hive_airflow_default_config_queue_override(self):
+        specific_mapred_queue = 'default'
+        t = operators.hive_operator.HiveOperator(
+            task_id='test_default_config_queue',
+            hql=self.hql,
+            mapred_queue=specific_mapred_queue,
+            mapred_queue_priority='HIGH',
+            mapred_job_name='airflow.test_default_config_queue',
+            dag=self.dag)
+
+        self.assertEqual(t.get_hook().mapred_queue, specific_mapred_queue)
+
+
 if 'AIRFLOW_RUNALL_TESTS' in os.environ:
 
     import airflow.hooks.hive_hooks
@@ -148,37 +199,15 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
             self.assertEqual(sql, args[0])
             self.assertEqual(self.nondefault_schema, kwargs['schema'])
 
-    class HivePrestoTest(unittest.TestCase):
-
-        def setUp(self):
-            configuration.load_test_config()
-            args = {'owner': 'airflow', 'start_date': DEFAULT_DATE}
-            dag = DAG('test_dag_id', default_args=args)
-            self.dag = dag
-            self.hql = """
-            USE airflow;
-            DROP TABLE IF EXISTS static_babynames_partitioned;
-            CREATE TABLE IF NOT EXISTS static_babynames_partitioned (
-                state string,
-                year string,
-                name string,
-                gender string,
-                num int)
-            PARTITIONED BY (ds string);
-            INSERT OVERWRITE TABLE static_babynames_partitioned
-                PARTITION(ds='{{ ds }}')
-            SELECT state, year, name, gender, num FROM static_babynames;
-            """
+    class HivePrestoTest(HiveEnvironmentTest):
 
         def test_hive(self):
-            import airflow.operators.hive_operator
             t = operators.hive_operator.HiveOperator(
                 task_id='basic_hql', hql=self.hql, dag=self.dag)
             t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
                   ignore_ti_state=True)
 
         def test_hive_queues(self):
-            import airflow.operators.hive_operator
             t = operators.hive_operator.HiveOperator(
                 task_id='test_hive_queues', hql=self.hql,
                 mapred_queue='default', mapred_queue_priority='HIGH',
@@ -188,13 +217,11 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
                   ignore_ti_state=True)
 
         def test_hive_dryrun(self):
-            import airflow.operators.hive_operator
             t = operators.hive_operator.HiveOperator(
                 task_id='dry_run_basic_hql', hql=self.hql, dag=self.dag)
             t.dry_run()
 
         def test_beeline(self):
-            import airflow.operators.hive_operator
             t = operators.hive_operator.HiveOperator(
                 task_id='beeline_hql', hive_cli_conn_id='beeline_default',
                 hql=self.hql, dag=self.dag)
@@ -205,14 +232,12 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
             sql = """
             SELECT count(1) FROM airflow.static_babynames_partitioned;
             """
-            import airflow.operators.presto_check_operator
             t = operators.presto_check_operator.PrestoCheckOperator(
                 task_id='presto_check', sql=sql, dag=self.dag)
             t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
                   ignore_ti_state=True)
 
         def test_presto_to_mysql(self):
-            import airflow.operators.presto_to_mysql
             t = operators.presto_to_mysql.PrestoToMySqlTransfer(
                 task_id='presto_to_mysql_check',
                 sql="""
@@ -253,7 +278,6 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
                   ignore_ti_state=True)
 
         def test_hive_stats(self):
-            import airflow.operators.hive_stats_operator
             t = operators.hive_stats_operator.HiveStatsCollectionOperator(
                 task_id='hive_stats_check',
                 table="airflow.static_babynames_partitioned",
@@ -322,7 +346,6 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
                   ignore_ti_state=True)
 
         def test_hive2samba(self):
-            import airflow.operators.hive_to_samba_operator
             t = operators.hive_to_samba_operator.Hive2SambaOperator(
                 task_id='hive2samba_check',
                 samba_conn_id='tableau_samba',
@@ -333,7 +356,6 @@ if 'AIRFLOW_RUNALL_TESTS' in os.environ:
                   ignore_ti_state=True)
 
         def test_hive_to_mysql(self):
-            import airflow.operators.hive_to_mysql
             t = operators.hive_to_mysql.HiveToMySqlTransfer(
                 mysql_conn_id='airflow_db',
                 task_id='hive_to_mysql_check',