You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by sa...@apache.org on 2017/09/14 22:40:18 UTC

incubator-airflow git commit: [AIRFLOW-1309] Allow hive_to_druid to take tblproperties

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 9218a2167 -> 6632b0ce1


[AIRFLOW-1309] Allow hive_to_druid to take tblproperties

Dear Airflow maintainers,

Please accept this PR. I understand that it will
not be reviewed until I have checked off all the
steps below!

### JIRA
- [ ] My PR addresses the following [Airflow JIRA]
(https://issues.apache.org/jira/browse/AIRFLOW/)
issues and references them in the PR title. For
example, "[AIRFLOW-XXX] My Airflow PR"
    -
https://issues.apache.org/jira/browse/AIRFLOW-1309

### Description
- [ ] Here are some details about my PR, including
screenshots of any UI changes: Add optional
tblproperties for the druid hook

### Tests
- [ ] My PR adds the following unit tests __OR__
does not need testing for this extremely good
reason: Will add

### Commits
- [ ] My commits all reference JIRA issues in
their subject lines, and I have squashed multiple
commits if they address the same issue. In
addition, my commits follow the guidelines from
"[How to write a good git commit
message](http://chris.beams.io/posts/git-
commit/)":
    1. Subject is separated from body by a blank line
    2. Subject is limited to 50 characters
    3. Subject does not end with a period
    4. Subject uses the imperative mood ("add", not
"adding")
    5. Body wraps at 72 characters
    6. Body explains "what" and "why", not "how"

Closes #2368 from saguziel/aguziel-update-hive-to-
druid


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/6632b0ce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/6632b0ce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/6632b0ce

Branch: refs/heads/master
Commit: 6632b0ce1cf01c82cb81e8a73af70212cf65ddd0
Parents: 9218a21
Author: Alex Guziel <al...@airbnb.com>
Authored: Thu Sep 14 15:40:13 2017 -0700
Committer: Alex Guziel <al...@airbnb.com>
Committed: Thu Sep 14 15:40:13 2017 -0700

----------------------------------------------------------------------
 airflow/operators/hive_to_druid.py | 8 +++++++-
 1 file changed, 7 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/6632b0ce/airflow/operators/hive_to_druid.py
----------------------------------------------------------------------
diff --git a/airflow/operators/hive_to_druid.py b/airflow/operators/hive_to_druid.py
index 7ac0b02..d7b1b82 100644
--- a/airflow/operators/hive_to_druid.py
+++ b/airflow/operators/hive_to_druid.py
@@ -46,6 +46,9 @@ class HiveToDruidTransfer(BaseOperator):
     :param intervals: list of time intervals that defines segments, this
         is passed as is to the json object
     :type intervals: list
+    :param hive_tblproperties: additional properties for tblproperties in
+        hive for the staging table
+    :type hive_tblproperties: dict
     """
 
     template_fields = ('sql', 'intervals')
@@ -67,6 +70,7 @@ class HiveToDruidTransfer(BaseOperator):
             target_partition_size=-1,
             query_granularity="NONE",
             segment_granularity="DAY",
+            hive_tblproperties=None,
             *args, **kwargs):
         super(HiveToDruidTransfer, self).__init__(*args, **kwargs)
         self.sql = sql
@@ -84,12 +88,14 @@ class HiveToDruidTransfer(BaseOperator):
         self.hadoop_dependency_coordinates = hadoop_dependency_coordinates
         self.druid_ingest_conn_id = druid_ingest_conn_id
         self.metastore_conn_id = metastore_conn_id
+        self.hive_tblproperties = hive_tblproperties
 
     def execute(self, context):
         hive = HiveCliHook(hive_cli_conn_id=self.hive_cli_conn_id)
         self.logger.info("Extracting data from Hive")
         hive_table = 'druid.' + context['task_instance_key_str'].replace('.', '_')
         sql = self.sql.strip().strip(';')
+        tblproperties = ''.join([", '{}' = '{}'".format(k, v) for k, v in self.hive_tblproperties.items()])
         hql = """\
         SET mapred.output.compress=false;
         SET hive.exec.compress.output=false;
@@ -97,7 +103,7 @@ class HiveToDruidTransfer(BaseOperator):
         CREATE TABLE {hive_table}
         ROW FORMAT DELIMITED FIELDS TERMINATED BY  '\t'
         STORED AS TEXTFILE
-        TBLPROPERTIES ('serialization.null.format' = '')
+        TBLPROPERTIES ('serialization.null.format' = ''{tblproperties})
         AS
         {sql}
         """.format(**locals())