You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2020/10/27 17:47:43 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-1278] Close HiveRegister in completion action usage

This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new a260f8f  [GOBBLIN-1278] Close HiveRegister in completion action usage
a260f8f is described below

commit a260f8f58b67a300f44720cb5a115596ac77ba7e
Author: zhchen <zh...@linkedin.com>
AuthorDate: Tue Oct 27 10:47:36 2020 -0700

    [GOBBLIN-1278] Close HiveRegister in completion action usage
    
    Closes #3118 from zxcware/hive
---
 .../action/CompactionHiveRegistrationAction.java   | 12 ++++---
 .../action/CompactionWatermarkAction.java          | 38 +++++++++++-----------
 2 files changed, 27 insertions(+), 23 deletions(-)

diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionHiveRegistrationAction.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionHiveRegistrationAction.java
index 0c63107..b5c3ca1 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionHiveRegistrationAction.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionHiveRegistrationAction.java
@@ -86,8 +86,12 @@ public class CompactionHiveRegistrationAction implements CompactionCompleteActio
       log.warn("Will not emit events in {} as EventSubmitter is null", getClass().getName());
     }
 
-    if (state.contains(ConfigurationKeys.HIVE_REGISTRATION_POLICY)) {
-      HiveRegister hiveRegister = HiveRegister.get(state);
+    if (!state.contains(ConfigurationKeys.HIVE_REGISTRATION_POLICY)) {
+      log.info("Will skip hive registration as {} is not configured.", ConfigurationKeys.HIVE_REGISTRATION_POLICY);
+      return;
+    }
+
+    try (HiveRegister hiveRegister = HiveRegister.get(state)) {
       HiveRegistrationPolicy hiveRegistrationPolicy = HiveRegistrationPolicyBase.getPolicy(state);
 
       List<String> paths = new ArrayList<>();
@@ -107,8 +111,8 @@ public class CompactionHiveRegistrationAction implements CompactionCompleteActio
 
       // submit events for hive registration
       if (eventSubmitter != null) {
-        Map<String, String> eventMetadataMap = ImmutableMap.of(CompactionSlaEventHelper.DATASET_URN, dataset.datasetURN(),
-            CompactionSlaEventHelper.HIVE_REGISTRATION_PATHS, Joiner.on(',').join(paths));
+        Map<String, String> eventMetadataMap = ImmutableMap
+            .of(CompactionSlaEventHelper.DATASET_URN, dataset.datasetURN(), CompactionSlaEventHelper.HIVE_REGISTRATION_PATHS, Joiner.on(',').join(paths));
         this.eventSubmitter.submit(CompactionSlaEventHelper.COMPACTION_HIVE_REGISTRATION_EVENT, eventMetadataMap);
       }
     }
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionWatermarkAction.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionWatermarkAction.java
index cd3d98d..4eafb2b 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionWatermarkAction.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionWatermarkAction.java
@@ -81,25 +81,25 @@ public class CompactionWatermarkAction implements CompactionCompleteAction<FileS
     String hiveDb = dbAndTable.getDb();
     String hiveTable = dbAndTable.getTable();
 
-    HiveRegister hiveRegister = HiveRegister.get(state);
-    Optional<HiveTable> tableOptional = hiveRegister.getTable(hiveDb, hiveTable);
-    if (!tableOptional.isPresent()) {
-      log.info("Table {}.{} not found. Skip publishing compaction watermarks", hiveDb, hiveTable);
-      return;
-    }
-
-    HiveTable table = tableOptional.get();
-    State tableProps = table.getProps();
-    boolean shouldUpdate =
-        mayUpdateWatermark(dataset, tableProps, CompactionWatermarkChecker.COMPACTION_WATERMARK, compactionWatermark);
-    if (mayUpdateWatermark(dataset, tableProps, CompactionWatermarkChecker.COMPLETION_COMPACTION_WATERMARK,
-        completeCompactionWatermark)) {
-      shouldUpdate = true;
-    }
-
-    if (shouldUpdate) {
-      log.info("Alter table {}.{} to publish watermarks {}", hiveDb, hiveTable, tableProps);
-      hiveRegister.alterTable(table);
+    try (HiveRegister hiveRegister = HiveRegister.get(state)) {
+      Optional<HiveTable> tableOptional = hiveRegister.getTable(hiveDb, hiveTable);
+      if (!tableOptional.isPresent()) {
+        log.info("Table {}.{} not found. Skip publishing compaction watermarks", hiveDb, hiveTable);
+        return;
+      }
+
+      HiveTable table = tableOptional.get();
+      State tableProps = table.getProps();
+      boolean shouldUpdate = mayUpdateWatermark(dataset, tableProps, CompactionWatermarkChecker.COMPACTION_WATERMARK, compactionWatermark);
+      if (mayUpdateWatermark(dataset, tableProps, CompactionWatermarkChecker.COMPLETION_COMPACTION_WATERMARK,
+          completeCompactionWatermark)) {
+        shouldUpdate = true;
+      }
+
+      if (shouldUpdate) {
+        log.info("Alter table {}.{} to publish watermarks {}", hiveDb, hiveTable, tableProps);
+        hiveRegister.alterTable(table);
+      }
     }
   }