You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by da...@apache.org on 2016/06/25 17:35:24 UTC
incubator-airflow git commit: [AIRFLOW-280] clean up tmp druid table
no matter if an ingestion job succeeds or not
Repository: incubator-airflow
Updated Branches:
refs/heads/master 518e0073a -> 3b84bcb3e
[AIRFLOW-280] clean up tmp druid table no matter if an ingestion job succeeds or not
Closes #1624 from hongbozeng/cleanup_druid
clean up tmp druid table no matter if the ingestion job success or not
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/3b84bcb3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/3b84bcb3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/3b84bcb3
Branch: refs/heads/master
Commit: 3b84bcb3eef629da69d10db8a99418afd0386193
Parents: 518e007
Author: Hongbo Zeng <ho...@airbnb.com>
Authored: Sat Jun 25 10:35:00 2016 -0700
Committer: Dan Davydov <da...@airbnb.com>
Committed: Sat Jun 25 10:35:00 2016 -0700
----------------------------------------------------------------------
airflow/operators/hive_to_druid.py | 27 ++++++++++++++-------------
1 file changed, 14 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3b84bcb3/airflow/operators/hive_to_druid.py
----------------------------------------------------------------------
diff --git a/airflow/operators/hive_to_druid.py b/airflow/operators/hive_to_druid.py
index 6d73e17..5ed5145 100644
--- a/airflow/operators/hive_to_druid.py
+++ b/airflow/operators/hive_to_druid.py
@@ -116,16 +116,17 @@ class HiveToDruidTransfer(BaseOperator):
logging.info("Inserting rows into Druid")
logging.info("HDFS path: " + static_path)
- druid.load_from_hdfs(
- datasource=self.druid_datasource,
- intervals=self.intervals,
- static_path=static_path, ts_dim=self.ts_dim,
- columns=columns, num_shards=self.num_shards, target_partition_size=self.target_partition_size,
- metric_spec=self.metric_spec, hadoop_dependency_coordinates=self.hadoop_dependency_coordinates)
- logging.info("Load seems to have succeeded!")
-
- logging.info(
- "Cleaning up by dropping the temp "
- "Hive table {}".format(hive_table))
- hql = "DROP TABLE IF EXISTS {}".format(hive_table)
- hive.run_cli(hql)
+ try:
+ druid.load_from_hdfs(
+ datasource=self.druid_datasource,
+ intervals=self.intervals,
+ static_path=static_path, ts_dim=self.ts_dim,
+ columns=columns, num_shards=self.num_shards, target_partition_size=self.target_partition_size,
+ metric_spec=self.metric_spec, hadoop_dependency_coordinates=self.hadoop_dependency_coordinates)
+ logging.info("Load seems to have succeeded!")
+ finally:
+ logging.info(
+ "Cleaning up by dropping the temp "
+ "Hive table {}".format(hive_table))
+ hql = "DROP TABLE IF EXISTS {}".format(hive_table)
+ hive.run_cli(hql)