You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2020/10/27 17:47:43 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-1278] Close
HiveRegister in completion action usage
This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new a260f8f [GOBBLIN-1278] Close HiveRegister in completion action usage
a260f8f is described below
commit a260f8f58b67a300f44720cb5a115596ac77ba7e
Author: zhchen <zh...@linkedin.com>
AuthorDate: Tue Oct 27 10:47:36 2020 -0700
[GOBBLIN-1278] Close HiveRegister in completion action usage
Closes #3118 from zxcware/hive
---
.../action/CompactionHiveRegistrationAction.java | 12 ++++---
.../action/CompactionWatermarkAction.java | 38 +++++++++++-----------
2 files changed, 27 insertions(+), 23 deletions(-)
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionHiveRegistrationAction.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionHiveRegistrationAction.java
index 0c63107..b5c3ca1 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionHiveRegistrationAction.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionHiveRegistrationAction.java
@@ -86,8 +86,12 @@ public class CompactionHiveRegistrationAction implements CompactionCompleteActio
log.warn("Will not emit events in {} as EventSubmitter is null", getClass().getName());
}
- if (state.contains(ConfigurationKeys.HIVE_REGISTRATION_POLICY)) {
- HiveRegister hiveRegister = HiveRegister.get(state);
+ if (!state.contains(ConfigurationKeys.HIVE_REGISTRATION_POLICY)) {
+ log.info("Will skip hive registration as {} is not configured.", ConfigurationKeys.HIVE_REGISTRATION_POLICY);
+ return;
+ }
+
+ try (HiveRegister hiveRegister = HiveRegister.get(state)) {
HiveRegistrationPolicy hiveRegistrationPolicy = HiveRegistrationPolicyBase.getPolicy(state);
List<String> paths = new ArrayList<>();
@@ -107,8 +111,8 @@ public class CompactionHiveRegistrationAction implements CompactionCompleteActio
// submit events for hive registration
if (eventSubmitter != null) {
- Map<String, String> eventMetadataMap = ImmutableMap.of(CompactionSlaEventHelper.DATASET_URN, dataset.datasetURN(),
- CompactionSlaEventHelper.HIVE_REGISTRATION_PATHS, Joiner.on(',').join(paths));
+ Map<String, String> eventMetadataMap = ImmutableMap
+ .of(CompactionSlaEventHelper.DATASET_URN, dataset.datasetURN(), CompactionSlaEventHelper.HIVE_REGISTRATION_PATHS, Joiner.on(',').join(paths));
this.eventSubmitter.submit(CompactionSlaEventHelper.COMPACTION_HIVE_REGISTRATION_EVENT, eventMetadataMap);
}
}
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionWatermarkAction.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionWatermarkAction.java
index cd3d98d..4eafb2b 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionWatermarkAction.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionWatermarkAction.java
@@ -81,25 +81,25 @@ public class CompactionWatermarkAction implements CompactionCompleteAction<FileS
String hiveDb = dbAndTable.getDb();
String hiveTable = dbAndTable.getTable();
- HiveRegister hiveRegister = HiveRegister.get(state);
- Optional<HiveTable> tableOptional = hiveRegister.getTable(hiveDb, hiveTable);
- if (!tableOptional.isPresent()) {
- log.info("Table {}.{} not found. Skip publishing compaction watermarks", hiveDb, hiveTable);
- return;
- }
-
- HiveTable table = tableOptional.get();
- State tableProps = table.getProps();
- boolean shouldUpdate =
- mayUpdateWatermark(dataset, tableProps, CompactionWatermarkChecker.COMPACTION_WATERMARK, compactionWatermark);
- if (mayUpdateWatermark(dataset, tableProps, CompactionWatermarkChecker.COMPLETION_COMPACTION_WATERMARK,
- completeCompactionWatermark)) {
- shouldUpdate = true;
- }
-
- if (shouldUpdate) {
- log.info("Alter table {}.{} to publish watermarks {}", hiveDb, hiveTable, tableProps);
- hiveRegister.alterTable(table);
+ try (HiveRegister hiveRegister = HiveRegister.get(state)) {
+ Optional<HiveTable> tableOptional = hiveRegister.getTable(hiveDb, hiveTable);
+ if (!tableOptional.isPresent()) {
+ log.info("Table {}.{} not found. Skip publishing compaction watermarks", hiveDb, hiveTable);
+ return;
+ }
+
+ HiveTable table = tableOptional.get();
+ State tableProps = table.getProps();
+ boolean shouldUpdate = mayUpdateWatermark(dataset, tableProps, CompactionWatermarkChecker.COMPACTION_WATERMARK, compactionWatermark);
+ if (mayUpdateWatermark(dataset, tableProps, CompactionWatermarkChecker.COMPLETION_COMPACTION_WATERMARK,
+ completeCompactionWatermark)) {
+ shouldUpdate = true;
+ }
+
+ if (shouldUpdate) {
+ log.info("Alter table {}.{} to publish watermarks {}", hiveDb, hiveTable, tableProps);
+ hiveRegister.alterTable(table);
+ }
}
}