You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by le...@apache.org on 2021/05/13 22:37:59 UTC

[gobblin] 01/02: [GOBBLIN-1442]Fix the bug of NoSuchElement Exception in HiveWriter

This is an automated email from the ASF dual-hosted git repository.

lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git

commit 2c314647f803ef7d91b90d276199e7b58d5f8d7d
Author: Zihan Li <zi...@linkedin.com>
AuthorDate: Tue May 11 14:59:06 2021 -0700

    [GOBBLIN-1442]Fix the bug of NoSuchElement Exception in HiveWriter
---
 .../gobblin/hive/writer/HiveMetadataWriter.java    | 29 +++++++++++-----------
 1 file changed, 15 insertions(+), 14 deletions(-)

diff --git a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
index a4550bc..b11948c 100644
--- a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
+++ b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
@@ -95,20 +95,21 @@ public class HiveMetadataWriter implements MetadataWriter {
   @Override
   public void flush(String dbName, String tableName) throws IOException {
     String tableKey = tableNameJoiner.join(dbName, tableName);
-    log.info("start to flush table: " + tableKey);
-    HashMap<List<String>, ListenableFuture<Void>> executionMap =
-        this.currentExecutionMap.computeIfAbsent(tableKey, s -> new HashMap<>());
-    //iterator all execution to get the result to make sure they all succeeded
-    for (HashMap.Entry<List<String>, ListenableFuture<Void>> execution : executionMap.entrySet()) {
-      try {
-        execution.getValue().get(timeOutSeconds, TimeUnit.SECONDS);
-      } catch (InterruptedException | ExecutionException | TimeoutException e) {
-        log.error("Error when getting the result of registration for table" + tableKey);
-        throw new RuntimeException(e);
+    if(this.currentExecutionMap.containsKey(tableKey)) {
+      log.info("start to flush table: " + tableKey);
+      HashMap<List<String>, ListenableFuture<Void>> executionMap = this.currentExecutionMap.get(tableKey);
+      //iterator all execution to get the result to make sure they all succeeded
+      for (HashMap.Entry<List<String>, ListenableFuture<Void>> execution : executionMap.entrySet()) {
+        try {
+          execution.getValue().get(timeOutSeconds, TimeUnit.SECONDS);
+        } catch (InterruptedException | ExecutionException | TimeoutException e) {
+          log.error("Error when getting the result of registration for table" + tableKey);
+          throw new RuntimeException(e);
+        }
       }
+      executionMap.clear();
+      log.info("finish flushing table: " + tableKey);
     }
-    executionMap.clear();
-    log.info("finish flushing table: " + tableKey);
   }
 
   public void write(GobblinMetadataChangeEvent gmce, Map<String, Collection<HiveSpec>> newSpecsMap,
@@ -131,7 +132,7 @@ public class HiveMetadataWriter implements MetadataWriter {
     //Calculate the topic name from gmce, fall back to topic.name in hive spec which can also be null
     //todo: make topicName fall back to topic.name in hive spec so that we can also get schema for re-write operation
     String topicName = null;
-    if (gmce.getTopicPartitionOffsetsRange() != null) {
+    if (gmce.getTopicPartitionOffsetsRange() != null && !gmce.getTopicPartitionOffsetsRange().isEmpty()) {
       String topicPartitionString = gmce.getTopicPartitionOffsetsRange().keySet().iterator().next();
       //In case the topic name is not the table name or the topic name contains '-'
       topicName = topicPartitionString.substring(0, topicPartitionString.lastIndexOf('-'));
@@ -278,7 +279,7 @@ public class HiveMetadataWriter implements MetadataWriter {
     if (whiteistBlacklist.acceptTable(tableSpec.getTable().getDbName(), tableSpec.getTable().getTableName())) {
       write(gmce, newSpecsMap, oldSpecsMap, tableSpec);
     } else {
-      log.info(String.format("Skip table %s.%s since it's blacklisted", tableSpec.getTable().getDbName(),
+      log.debug(String.format("Skip table %s.%s since it's blacklisted", tableSpec.getTable().getDbName(),
           tableSpec.getTable().getTableName()));
     }
   }