You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by sa...@apache.org on 2017/07/10 19:14:24 UTC
incubator-airflow git commit: [AIRFLOW-1300] Enable table creation
with TBLPROPERTIES
Repository: incubator-airflow
Updated Branches:
refs/heads/master 0fc45045a -> b532d8d77
[AIRFLOW-1300] Enable table creation with TBLPROPERTIES
Enable TBLPROPERTIES parameter in load_df and
load_file methods of
HiveCliHook and TransferHive operators
Closes #2364 from
krishnabhupatiraju/tblproperties_hiveclihook
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b532d8d7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b532d8d7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b532d8d7
Branch: refs/heads/master
Commit: b532d8d7742e0a1141732654c2796bfc6dc6cabc
Parents: 0fc4504
Author: Krishna Bhupatiraju <kr...@airbnb.com>
Authored: Mon Jul 10 12:14:19 2017 -0700
Committer: Alex Guziel <al...@airbnb.com>
Committed: Mon Jul 10 12:14:19 2017 -0700
----------------------------------------------------------------------
airflow/hooks/hive_hooks.py | 29 ++++++++++++++++++++-------
airflow/operators/mssql_to_hive.py | 7 ++++++-
airflow/operators/mysql_to_hive.py | 7 ++++++-
airflow/operators/s3_to_hive_operator.py | 10 +++++++--
tests/operators/operators.py | 16 +++++++++++++++
5 files changed, 58 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b532d8d7/airflow/hooks/hive_hooks.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/hive_hooks.py b/airflow/hooks/hive_hooks.py
index 3e7d2db..d120769 100644
--- a/airflow/hooks/hive_hooks.py
+++ b/airflow/hooks/hive_hooks.py
@@ -343,7 +343,8 @@ class HiveCliHook(BaseHook):
create=True,
overwrite=True,
partition=None,
- recreate=False):
+ recreate=False,
+ tblproperties=None):
"""
Loads a local file into Hive
@@ -354,19 +355,28 @@ class HiveCliHook(BaseHook):
stage the data into a temporary table before loading it into its
final destination using a ``HiveOperator``.
+ :param filepath: local filepath of the file to load
+ :type filepath: str
:param table: target Hive table, use dot notation to target a
specific database
:type table: str
+ :param delimiter: field delimiter in the file
+ :type delimiter: str
+ :param field_dict: A dictionary of the fields name in the file
+ as keys and their Hive types as values
+ :type field_dict: dict
:param create: whether to create the table if it doesn't exist
:type create: bool
- :param recreate: whether to drop and recreate the table at every
- execution
- :type recreate: bool
+ :param overwrite: whether to overwrite the data in table or partition
+ :type overwrite: bool
:param partition: target partition as a dict of partition columns
and values
:type partition: dict
- :param delimiter: field delimiter in the file
- :type delimiter: str
+ :param recreate: whether to drop and recreate the table at every
+ execution
+ :type recreate: bool
+ :param tblproperties: TBLPROPERTIES of the hive table being created
+ :type tblproperties: dict
"""
hql = ''
if recreate:
@@ -383,7 +393,12 @@ class HiveCliHook(BaseHook):
hql += "PARTITIONED BY ({pfields})\n"
hql += "ROW FORMAT DELIMITED\n"
hql += "FIELDS TERMINATED BY '{delimiter}'\n"
- hql += "STORED AS textfile;"
+ hql += "STORED AS textfile\n"
+ if tblproperties is not None:
+ tprops = ", ".join(
+ ["'{0}'='{1}'".format(k, v) for k, v in tblproperties.items()])
+ hql += "TBLPROPERTIES({tprops})\n"
+ hql += ";"
hql = hql.format(**locals())
logging.info(hql)
self.run_cli(hql)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b532d8d7/airflow/operators/mssql_to_hive.py
----------------------------------------------------------------------
diff --git a/airflow/operators/mssql_to_hive.py b/airflow/operators/mssql_to_hive.py
index 6d7521e..a0a2e10 100644
--- a/airflow/operators/mssql_to_hive.py
+++ b/airflow/operators/mssql_to_hive.py
@@ -57,6 +57,8 @@ class MsSqlToHiveTransfer(BaseOperator):
:type mssql_conn_id: str
:param hive_conn_id: destination hive connection
:type hive_conn_id: str
+ :param tblproperties: TBLPROPERTIES of the hive table being created
+ :type tblproperties: dict
"""
template_fields = ('sql', 'partition', 'hive_table')
@@ -74,6 +76,7 @@ class MsSqlToHiveTransfer(BaseOperator):
delimiter=chr(1),
mssql_conn_id='mssql_default',
hive_cli_conn_id='hive_cli_default',
+ tblproperties=None,
*args, **kwargs):
super(MsSqlToHiveTransfer, self).__init__(*args, **kwargs)
self.sql = sql
@@ -85,6 +88,7 @@ class MsSqlToHiveTransfer(BaseOperator):
self.mssql_conn_id = mssql_conn_id
self.hive_cli_conn_id = hive_cli_conn_id
self.partition = partition or {}
+ self.tblproperties = tblproperties
@classmethod
def type_map(cls, mssql_type):
@@ -124,4 +128,5 @@ class MsSqlToHiveTransfer(BaseOperator):
create=self.create,
partition=self.partition,
delimiter=self.delimiter,
- recreate=self.recreate)
+ recreate=self.recreate,
+ tblproperties=self.tblproperties)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b532d8d7/airflow/operators/mysql_to_hive.py
----------------------------------------------------------------------
diff --git a/airflow/operators/mysql_to_hive.py b/airflow/operators/mysql_to_hive.py
index 2fa2541..ad3ecae 100644
--- a/airflow/operators/mysql_to_hive.py
+++ b/airflow/operators/mysql_to_hive.py
@@ -58,6 +58,8 @@ class MySqlToHiveTransfer(BaseOperator):
:type mysql_conn_id: str
:param hive_conn_id: destination hive connection
:type hive_conn_id: str
+ :param tblproperties: TBLPROPERTIES of the hive table being created
+ :type tblproperties: dict
"""
template_fields = ('sql', 'partition', 'hive_table')
@@ -75,6 +77,7 @@ class MySqlToHiveTransfer(BaseOperator):
delimiter=chr(1),
mysql_conn_id='mysql_default',
hive_cli_conn_id='hive_cli_default',
+ tblproperties=None,
*args, **kwargs):
super(MySqlToHiveTransfer, self).__init__(*args, **kwargs)
self.sql = sql
@@ -86,6 +89,7 @@ class MySqlToHiveTransfer(BaseOperator):
self.mysql_conn_id = mysql_conn_id
self.hive_cli_conn_id = hive_cli_conn_id
self.partition = partition or {}
+ self.tblproperties = tblproperties
@classmethod
def type_map(cls, mysql_type):
@@ -128,4 +132,5 @@ class MySqlToHiveTransfer(BaseOperator):
create=self.create,
partition=self.partition,
delimiter=self.delimiter,
- recreate=self.recreate)
+ recreate=self.recreate,
+ tblproperties=self.tblproperties)
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b532d8d7/airflow/operators/s3_to_hive_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/s3_to_hive_operator.py b/airflow/operators/s3_to_hive_operator.py
index 92340f8..7ae0616 100644
--- a/airflow/operators/s3_to_hive_operator.py
+++ b/airflow/operators/s3_to_hive_operator.py
@@ -78,6 +78,8 @@ class S3ToHiveTransfer(BaseOperator):
:param input_compressed: Boolean to determine if file decompression is
required to process headers
:type input_compressed: bool
+ :param tblproperties: TBLPROPERTIES of the hive table being created
+ :type tblproperties: dict
"""
template_fields = ('s3_key', 'partition', 'hive_table')
@@ -100,6 +102,7 @@ class S3ToHiveTransfer(BaseOperator):
s3_conn_id='s3_default',
hive_cli_conn_id='hive_cli_default',
input_compressed=False,
+ tblproperties=None,
*args, **kwargs):
super(S3ToHiveTransfer, self).__init__(*args, **kwargs)
self.s3_key = s3_key
@@ -115,6 +118,7 @@ class S3ToHiveTransfer(BaseOperator):
self.hive_cli_conn_id = hive_cli_conn_id
self.s3_conn_id = s3_conn_id
self.input_compressed = input_compressed
+ self.tblproperties = tblproperties
if (self.check_headers and
not (self.field_dict is not None and self.headers)):
@@ -156,7 +160,8 @@ class S3ToHiveTransfer(BaseOperator):
create=self.create,
partition=self.partition,
delimiter=self.delimiter,
- recreate=self.recreate)
+ recreate=self.recreate,
+ tblproperties=self.tblproperties)
else:
# Decompressing file
if self.input_compressed:
@@ -193,7 +198,8 @@ class S3ToHiveTransfer(BaseOperator):
create=self.create,
partition=self.partition,
delimiter=self.delimiter,
- recreate=self.recreate)
+ recreate=self.recreate,
+ tblproperties=self.tblproperties)
def _get_top_row_as_list(self, file_name):
with open(file_name, 'rt') as f:
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b532d8d7/tests/operators/operators.py
----------------------------------------------------------------------
diff --git a/tests/operators/operators.py b/tests/operators/operators.py
index 62bc4bf..0f5abd5 100644
--- a/tests/operators/operators.py
+++ b/tests/operators/operators.py
@@ -283,3 +283,19 @@ class TransferTests(unittest.TestCase):
delimiter=",",
dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
+
+ def test_mysql_to_hive_tblproperties(self):
+ # import airflow.operators
+ from airflow.operators.mysql_to_hive import MySqlToHiveTransfer
+ sql = "SELECT * FROM baby_names LIMIT 1000;"
+ t = MySqlToHiveTransfer(
+ task_id='test_m2h',
+ mysql_conn_id='airflow_ci',
+ hive_cli_conn_id='beeline_default',
+ sql=sql,
+ hive_table='test_mysql_to_hive',
+ recreate=True,
+ delimiter=",",
+ tblproperties={'test_property':'test_value'},
+ dag=self.dag)
+ t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)