You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by sa...@apache.org on 2017/07/10 19:14:24 UTC

incubator-airflow git commit: [AIRFLOW-1300] Enable table creation with TBLPROPERTIES

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 0fc45045a -> b532d8d77


[AIRFLOW-1300] Enable table creation with TBLPROPERTIES

Enable TBLPROPERTIES parameter in load_df and
load_file methods of
HiveCliHook and TransferHive operators

Closes #2364 from
krishnabhupatiraju/tblproperties_hiveclihook


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b532d8d7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b532d8d7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b532d8d7

Branch: refs/heads/master
Commit: b532d8d7742e0a1141732654c2796bfc6dc6cabc
Parents: 0fc4504
Author: Krishna Bhupatiraju <kr...@airbnb.com>
Authored: Mon Jul 10 12:14:19 2017 -0700
Committer: Alex Guziel <al...@airbnb.com>
Committed: Mon Jul 10 12:14:19 2017 -0700

----------------------------------------------------------------------
 airflow/hooks/hive_hooks.py              | 29 ++++++++++++++++++++-------
 airflow/operators/mssql_to_hive.py       |  7 ++++++-
 airflow/operators/mysql_to_hive.py       |  7 ++++++-
 airflow/operators/s3_to_hive_operator.py | 10 +++++++--
 tests/operators/operators.py             | 16 +++++++++++++++
 5 files changed, 58 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b532d8d7/airflow/hooks/hive_hooks.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/hive_hooks.py b/airflow/hooks/hive_hooks.py
index 3e7d2db..d120769 100644
--- a/airflow/hooks/hive_hooks.py
+++ b/airflow/hooks/hive_hooks.py
@@ -343,7 +343,8 @@ class HiveCliHook(BaseHook):
             create=True,
             overwrite=True,
             partition=None,
-            recreate=False):
+            recreate=False,
+            tblproperties=None):
         """
         Loads a local file into Hive
 
@@ -354,19 +355,28 @@ class HiveCliHook(BaseHook):
         stage the data into a temporary table before loading it into its
         final destination using a ``HiveOperator``.
 
+        :param filepath: local filepath of the file to load
+        :type filepath: str       
         :param table: target Hive table, use dot notation to target a
             specific database
         :type table: str
+        :param delimiter: field delimiter in the file
+        :type delimiter: str
+        :param field_dict: A dictionary of the fields name in the file
+            as keys and their Hive types as values
+        :type field_dict: dict
         :param create: whether to create the table if it doesn't exist
         :type create: bool
-        :param recreate: whether to drop and recreate the table at every
-            execution
-        :type recreate: bool
+        :param overwrite: whether to overwrite the data in table or partition
+        :type overwrite: bool
         :param partition: target partition as a dict of partition columns
             and values
         :type partition: dict
-        :param delimiter: field delimiter in the file
-        :type delimiter: str
+        :param recreate: whether to drop and recreate the table at every
+            execution
+        :type recreate: bool
+        :param tblproperties: TBLPROPERTIES of the hive table being created
+        :type tblproperties: dict
         """
         hql = ''
         if recreate:
@@ -383,7 +393,12 @@ class HiveCliHook(BaseHook):
                 hql += "PARTITIONED BY ({pfields})\n"
             hql += "ROW FORMAT DELIMITED\n"
             hql += "FIELDS TERMINATED BY '{delimiter}'\n"
-            hql += "STORED AS textfile;"
+            hql += "STORED AS textfile\n"
+            if tblproperties is not None:
+                tprops = ", ".join(
+                    ["'{0}'='{1}'".format(k, v) for k, v in tblproperties.items()])
+                hql += "TBLPROPERTIES({tprops})\n"
+        hql += ";" 
         hql = hql.format(**locals())
         logging.info(hql)
         self.run_cli(hql)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b532d8d7/airflow/operators/mssql_to_hive.py
----------------------------------------------------------------------
diff --git a/airflow/operators/mssql_to_hive.py b/airflow/operators/mssql_to_hive.py
index 6d7521e..a0a2e10 100644
--- a/airflow/operators/mssql_to_hive.py
+++ b/airflow/operators/mssql_to_hive.py
@@ -57,6 +57,8 @@ class MsSqlToHiveTransfer(BaseOperator):
     :type mssql_conn_id: str
     :param hive_conn_id: destination hive connection
     :type hive_conn_id: str
+    :param tblproperties: TBLPROPERTIES of the hive table being created
+    :type tblproperties: dict
     """
 
     template_fields = ('sql', 'partition', 'hive_table')
@@ -74,6 +76,7 @@ class MsSqlToHiveTransfer(BaseOperator):
             delimiter=chr(1),
             mssql_conn_id='mssql_default',
             hive_cli_conn_id='hive_cli_default',
+            tblproperties=None,
             *args, **kwargs):
         super(MsSqlToHiveTransfer, self).__init__(*args, **kwargs)
         self.sql = sql
@@ -85,6 +88,7 @@ class MsSqlToHiveTransfer(BaseOperator):
         self.mssql_conn_id = mssql_conn_id
         self.hive_cli_conn_id = hive_cli_conn_id
         self.partition = partition or {}
+        self.tblproperties = tblproperties
 
     @classmethod
     def type_map(cls, mssql_type):
@@ -124,4 +128,5 @@ class MsSqlToHiveTransfer(BaseOperator):
                 create=self.create,
                 partition=self.partition,
                 delimiter=self.delimiter,
-                recreate=self.recreate)
+                recreate=self.recreate,
+                tblproperties=self.tblproperties)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b532d8d7/airflow/operators/mysql_to_hive.py
----------------------------------------------------------------------
diff --git a/airflow/operators/mysql_to_hive.py b/airflow/operators/mysql_to_hive.py
index 2fa2541..ad3ecae 100644
--- a/airflow/operators/mysql_to_hive.py
+++ b/airflow/operators/mysql_to_hive.py
@@ -58,6 +58,8 @@ class MySqlToHiveTransfer(BaseOperator):
     :type mysql_conn_id: str
     :param hive_conn_id: destination hive connection
     :type hive_conn_id: str
+    :param tblproperties: TBLPROPERTIES of the hive table being created
+    :type tblproperties: dict
     """
 
     template_fields = ('sql', 'partition', 'hive_table')
@@ -75,6 +77,7 @@ class MySqlToHiveTransfer(BaseOperator):
             delimiter=chr(1),
             mysql_conn_id='mysql_default',
             hive_cli_conn_id='hive_cli_default',
+            tblproperties=None,
             *args, **kwargs):
         super(MySqlToHiveTransfer, self).__init__(*args, **kwargs)
         self.sql = sql
@@ -86,6 +89,7 @@ class MySqlToHiveTransfer(BaseOperator):
         self.mysql_conn_id = mysql_conn_id
         self.hive_cli_conn_id = hive_cli_conn_id
         self.partition = partition or {}
+        self.tblproperties = tblproperties
 
     @classmethod
     def type_map(cls, mysql_type):
@@ -128,4 +132,5 @@ class MySqlToHiveTransfer(BaseOperator):
                 create=self.create,
                 partition=self.partition,
                 delimiter=self.delimiter,
-                recreate=self.recreate)
+                recreate=self.recreate,
+                tblproperties=self.tblproperties)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b532d8d7/airflow/operators/s3_to_hive_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/s3_to_hive_operator.py b/airflow/operators/s3_to_hive_operator.py
index 92340f8..7ae0616 100644
--- a/airflow/operators/s3_to_hive_operator.py
+++ b/airflow/operators/s3_to_hive_operator.py
@@ -78,6 +78,8 @@ class S3ToHiveTransfer(BaseOperator):
     :param input_compressed: Boolean to determine if file decompression is
         required to process headers
     :type input_compressed: bool
+    :param tblproperties: TBLPROPERTIES of the hive table being created
+    :type tblproperties: dict
     """
 
     template_fields = ('s3_key', 'partition', 'hive_table')
@@ -100,6 +102,7 @@ class S3ToHiveTransfer(BaseOperator):
             s3_conn_id='s3_default',
             hive_cli_conn_id='hive_cli_default',
             input_compressed=False,
+            tblproperties=None,
             *args, **kwargs):
         super(S3ToHiveTransfer, self).__init__(*args, **kwargs)
         self.s3_key = s3_key
@@ -115,6 +118,7 @@ class S3ToHiveTransfer(BaseOperator):
         self.hive_cli_conn_id = hive_cli_conn_id
         self.s3_conn_id = s3_conn_id
         self.input_compressed = input_compressed
+        self.tblproperties = tblproperties
 
         if (self.check_headers and
                 not (self.field_dict is not None and self.headers)):
@@ -156,7 +160,8 @@ class S3ToHiveTransfer(BaseOperator):
                     create=self.create,
                     partition=self.partition,
                     delimiter=self.delimiter,
-                    recreate=self.recreate)
+                    recreate=self.recreate,
+                    tblproperties=self.tblproperties)
             else:
                 # Decompressing file
                 if self.input_compressed:
@@ -193,7 +198,8 @@ class S3ToHiveTransfer(BaseOperator):
                                     create=self.create,
                                     partition=self.partition,
                                     delimiter=self.delimiter,
-                                    recreate=self.recreate)
+                                    recreate=self.recreate,
+                                    tblproperties=self.tblproperties)
 
     def _get_top_row_as_list(self, file_name):
         with open(file_name, 'rt') as f:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b532d8d7/tests/operators/operators.py
----------------------------------------------------------------------
diff --git a/tests/operators/operators.py b/tests/operators/operators.py
index 62bc4bf..0f5abd5 100644
--- a/tests/operators/operators.py
+++ b/tests/operators/operators.py
@@ -283,3 +283,19 @@ class TransferTests(unittest.TestCase):
             delimiter=",",
             dag=self.dag)
         t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+
+    def test_mysql_to_hive_tblproperties(self):
+        # import airflow.operators
+        from airflow.operators.mysql_to_hive import MySqlToHiveTransfer
+        sql = "SELECT * FROM baby_names LIMIT 1000;"
+        t = MySqlToHiveTransfer(
+            task_id='test_m2h',
+            mysql_conn_id='airflow_ci',
+            hive_cli_conn_id='beeline_default',
+            sql=sql,
+            hive_table='test_mysql_to_hive',
+            recreate=True,
+            delimiter=",",
+            tblproperties={'test_property':'test_value'},
+            dag=self.dag)
+        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)