You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2019/01/04 23:35:09 UTC

[GitHub] feng-tao closed pull request #4432: [AIRFLOW-3622] Add ability to pass hive_conf to HiveToMysqlTransfer

feng-tao closed pull request #4432: [AIRFLOW-3622] Add ability to pass hive_conf to HiveToMysqlTransfer
URL: https://github.com/apache/incubator-airflow/pull/4432
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/hooks/hive_hooks.py b/airflow/hooks/hive_hooks.py
index 42a42318c6..a79a46d18a 100644
--- a/airflow/hooks/hive_hooks.py
+++ b/airflow/hooks/hive_hooks.py
@@ -902,7 +902,7 @@ def to_csv(
 
         self.log.info("Done. Loaded a total of %s rows.", i)
 
-    def get_records(self, hql, schema='default'):
+    def get_records(self, hql, schema='default', hive_conf=None):
         """
         Get a set of records from a Hive query.
 
@@ -911,7 +911,7 @@ def get_records(self, hql, schema='default'):
         >>> len(hh.get_records(sql))
         100
         """
-        return self.get_results(hql, schema=schema)['data']
+        return self.get_results(hql, schema=schema, hive_conf=hive_conf)['data']
 
     def get_pandas_df(self, hql, schema='default'):
         """
diff --git a/airflow/operators/hive_to_mysql.py b/airflow/operators/hive_to_mysql.py
index 5809b76046..fa0b49f420 100644
--- a/airflow/operators/hive_to_mysql.py
+++ b/airflow/operators/hive_to_mysql.py
@@ -70,6 +70,7 @@ def __init__(self,
                  mysql_preoperator=None,
                  mysql_postoperator=None,
                  bulk_load=False,
+                 hive_conf=None,
                  *args, **kwargs):
         super(HiveToMySqlTransfer, self).__init__(*args, **kwargs)
         self.sql = sql
@@ -79,11 +80,15 @@ def __init__(self,
         self.mysql_postoperator = mysql_postoperator
         self.hiveserver2_conn_id = hiveserver2_conn_id
         self.bulk_load = bulk_load
+        self.hive_conf = hive_conf
 
     def execute(self, context):
         hive = HiveServer2Hook(hiveserver2_conn_id=self.hiveserver2_conn_id)
 
         self.log.info("Extracting data from Hive: %s", self.sql)
+        hive_conf = context_to_airflow_vars(context)
+        if self.hive_conf:
+            hive_conf.update(self.hive_conf)
         if self.bulk_load:
             tmp_file = NamedTemporaryFile()
             hive.to_csv(self.sql,
@@ -91,9 +96,9 @@ def execute(self, context):
                         delimiter='\t',
                         lineterminator='\n',
                         output_header=False,
-                        hive_conf=context_to_airflow_vars(context))
+                        hive_conf=hive_conf)
         else:
-            hive_results = hive.get_records(self.sql)
+            hive_results = hive.get_records(self.sql, hive_conf=hive_conf)
 
         mysql = MySqlHook(mysql_conn_id=self.mysql_conn_id)
 
diff --git a/tests/operators/test_hive_to_mysql.py b/tests/operators/test_hive_to_mysql.py
index ec69031199..799d9112fd 100644
--- a/tests/operators/test_hive_to_mysql.py
+++ b/tests/operators/test_hive_to_mysql.py
@@ -92,3 +92,18 @@ def test_execute_bulk_load(self, mock_hive_hook, mock_tmp_file, mock_mysql_hook)
             tmp_file=mock_tmp_file.return_value.name
         )
         mock_tmp_file.return_value.close.assert_called_once_with()
+
+    @patch('airflow.operators.hive_to_mysql.MySqlHook')
+    @patch('airflow.operators.hive_to_mysql.HiveServer2Hook')
+    def test_execute_with_hive_conf(self, mock_hive_hook, mock_mysql_hook):
+        context = {}
+        self.kwargs.update(dict(hive_conf={'mapreduce.job.queuename': 'fake_queue'}))
+
+        HiveToMySqlTransfer(**self.kwargs).execute(context=context)
+
+        hive_conf = context_to_airflow_vars(context)
+        hive_conf.update(self.kwargs['hive_conf'])
+        mock_hive_hook.return_value.get_records.assert_called_once_with(
+            self.kwargs['sql'],
+            hive_conf=hive_conf
+        )


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services