You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by da...@apache.org on 2016/06/25 17:35:24 UTC

incubator-airflow git commit: [AIRFLOW-280] clean up tmp druid table no matter if an ingestion job succeeds or not

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 518e0073a -> 3b84bcb3e


[AIRFLOW-280] clean up tmp druid table no matter if an ingestion job succeeds or not

Closes #1624 from hongbozeng/cleanup_druid

clean up tmp druid table no matter if the ingestion job success or not


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/3b84bcb3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/3b84bcb3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/3b84bcb3

Branch: refs/heads/master
Commit: 3b84bcb3eef629da69d10db8a99418afd0386193
Parents: 518e007
Author: Hongbo Zeng <ho...@airbnb.com>
Authored: Sat Jun 25 10:35:00 2016 -0700
Committer: Dan Davydov <da...@airbnb.com>
Committed: Sat Jun 25 10:35:00 2016 -0700

----------------------------------------------------------------------
 airflow/operators/hive_to_druid.py | 27 ++++++++++++++-------------
 1 file changed, 14 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3b84bcb3/airflow/operators/hive_to_druid.py
----------------------------------------------------------------------
diff --git a/airflow/operators/hive_to_druid.py b/airflow/operators/hive_to_druid.py
index 6d73e17..5ed5145 100644
--- a/airflow/operators/hive_to_druid.py
+++ b/airflow/operators/hive_to_druid.py
@@ -116,16 +116,17 @@ class HiveToDruidTransfer(BaseOperator):
         logging.info("Inserting rows into Druid")
         logging.info("HDFS path: " + static_path)
 
-        druid.load_from_hdfs(
-            datasource=self.druid_datasource,
-            intervals=self.intervals,
-            static_path=static_path, ts_dim=self.ts_dim,
-            columns=columns, num_shards=self.num_shards, target_partition_size=self.target_partition_size,
-            metric_spec=self.metric_spec, hadoop_dependency_coordinates=self.hadoop_dependency_coordinates)
-        logging.info("Load seems to have succeeded!")
-
-        logging.info(
-            "Cleaning up by dropping the temp "
-            "Hive table {}".format(hive_table))
-        hql = "DROP TABLE IF EXISTS {}".format(hive_table)
-        hive.run_cli(hql)
+        try:
+            druid.load_from_hdfs(
+                datasource=self.druid_datasource,
+                intervals=self.intervals,
+                static_path=static_path, ts_dim=self.ts_dim,
+                columns=columns, num_shards=self.num_shards, target_partition_size=self.target_partition_size,
+                metric_spec=self.metric_spec, hadoop_dependency_coordinates=self.hadoop_dependency_coordinates)
+            logging.info("Load seems to have succeeded!")
+        finally:
+            logging.info(
+                "Cleaning up by dropping the temp "
+                "Hive table {}".format(hive_table))
+            hql = "DROP TABLE IF EXISTS {}".format(hive_table)
+            hive.run_cli(hql)