You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by sa...@apache.org on 2017/09/14 22:40:18 UTC
incubator-airflow git commit: [AIRFLOW-1309] Allow hive_to_druid to
take tblproperties
Repository: incubator-airflow
Updated Branches:
refs/heads/master 9218a2167 -> 6632b0ce1
[AIRFLOW-1309] Allow hive_to_druid to take tblproperties
Dear Airflow maintainers,
Please accept this PR. I understand that it will
not be reviewed until I have checked off all the
steps below!
### JIRA
- [ ] My PR addresses the following [Airflow JIRA]
(https://issues.apache.org/jira/browse/AIRFLOW/)
issues and references them in the PR title. For
example, "[AIRFLOW-XXX] My Airflow PR"
-
https://issues.apache.org/jira/browse/AIRFLOW-1309
### Description
- [ ] Here are some details about my PR, including
screenshots of any UI changes: Add optional
tblproperties for the druid hook
### Tests
- [ ] My PR adds the following unit tests __OR__
does not need testing for this extremely good
reason: Will add
### Commits
- [ ] My commits all reference JIRA issues in
their subject lines, and I have squashed multiple
commits if they address the same issue. In
addition, my commits follow the guidelines from
"[How to write a good git commit
message](http://chris.beams.io/posts/git-
commit/)":
1. Subject is separated from body by a blank line
2. Subject is limited to 50 characters
3. Subject does not end with a period
4. Subject uses the imperative mood ("add", not
"adding")
5. Body wraps at 72 characters
6. Body explains "what" and "why", not "how"
Closes #2368 from saguziel/aguziel-update-hive-to-
druid
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/6632b0ce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/6632b0ce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/6632b0ce
Branch: refs/heads/master
Commit: 6632b0ce1cf01c82cb81e8a73af70212cf65ddd0
Parents: 9218a21
Author: Alex Guziel <al...@airbnb.com>
Authored: Thu Sep 14 15:40:13 2017 -0700
Committer: Alex Guziel <al...@airbnb.com>
Committed: Thu Sep 14 15:40:13 2017 -0700
----------------------------------------------------------------------
airflow/operators/hive_to_druid.py | 8 +++++++-
1 file changed, 7 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6632b0ce/airflow/operators/hive_to_druid.py
----------------------------------------------------------------------
diff --git a/airflow/operators/hive_to_druid.py b/airflow/operators/hive_to_druid.py
index 7ac0b02..d7b1b82 100644
--- a/airflow/operators/hive_to_druid.py
+++ b/airflow/operators/hive_to_druid.py
@@ -46,6 +46,9 @@ class HiveToDruidTransfer(BaseOperator):
:param intervals: list of time intervals that defines segments, this
is passed as is to the json object
:type intervals: list
+ :param hive_tblproperties: additional properties for tblproperties in
+ hive for the staging table
+ :type hive_tblproperties: dict
"""
template_fields = ('sql', 'intervals')
@@ -67,6 +70,7 @@ class HiveToDruidTransfer(BaseOperator):
target_partition_size=-1,
query_granularity="NONE",
segment_granularity="DAY",
+ hive_tblproperties=None,
*args, **kwargs):
super(HiveToDruidTransfer, self).__init__(*args, **kwargs)
self.sql = sql
@@ -84,12 +88,14 @@ class HiveToDruidTransfer(BaseOperator):
self.hadoop_dependency_coordinates = hadoop_dependency_coordinates
self.druid_ingest_conn_id = druid_ingest_conn_id
self.metastore_conn_id = metastore_conn_id
+ self.hive_tblproperties = hive_tblproperties
def execute(self, context):
hive = HiveCliHook(hive_cli_conn_id=self.hive_cli_conn_id)
self.logger.info("Extracting data from Hive")
hive_table = 'druid.' + context['task_instance_key_str'].replace('.', '_')
sql = self.sql.strip().strip(';')
+ tblproperties = ''.join([", '{}' = '{}'".format(k, v) for k, v in self.hive_tblproperties.items()])
hql = """\
SET mapred.output.compress=false;
SET hive.exec.compress.output=false;
@@ -97,7 +103,7 @@ class HiveToDruidTransfer(BaseOperator):
CREATE TABLE {hive_table}
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE
- TBLPROPERTIES ('serialization.null.format' = '')
+ TBLPROPERTIES ('serialization.null.format' = ''{tblproperties})
AS
{sql}
""".format(**locals())