You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by le...@apache.org on 2021/05/13 22:37:59 UTC
[gobblin] 01/02: [GOBBLIN-1442]Fix the bug of NoSuchElement
Exception in HiveWriter
This is an automated email from the ASF dual-hosted git repository.
lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
commit 2c314647f803ef7d91b90d276199e7b58d5f8d7d
Author: Zihan Li <zi...@linkedin.com>
AuthorDate: Tue May 11 14:59:06 2021 -0700
[GOBBLIN-1442]Fix the bug of NoSuchElement Exception in HiveWriter
---
.../gobblin/hive/writer/HiveMetadataWriter.java | 29 +++++++++++-----------
1 file changed, 15 insertions(+), 14 deletions(-)
diff --git a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
index a4550bc..b11948c 100644
--- a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
+++ b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
@@ -95,20 +95,21 @@ public class HiveMetadataWriter implements MetadataWriter {
@Override
public void flush(String dbName, String tableName) throws IOException {
String tableKey = tableNameJoiner.join(dbName, tableName);
- log.info("start to flush table: " + tableKey);
- HashMap<List<String>, ListenableFuture<Void>> executionMap =
- this.currentExecutionMap.computeIfAbsent(tableKey, s -> new HashMap<>());
- //iterator all execution to get the result to make sure they all succeeded
- for (HashMap.Entry<List<String>, ListenableFuture<Void>> execution : executionMap.entrySet()) {
- try {
- execution.getValue().get(timeOutSeconds, TimeUnit.SECONDS);
- } catch (InterruptedException | ExecutionException | TimeoutException e) {
- log.error("Error when getting the result of registration for table" + tableKey);
- throw new RuntimeException(e);
+ if(this.currentExecutionMap.containsKey(tableKey)) {
+ log.info("start to flush table: " + tableKey);
+ HashMap<List<String>, ListenableFuture<Void>> executionMap = this.currentExecutionMap.get(tableKey);
+ //iterator all execution to get the result to make sure they all succeeded
+ for (HashMap.Entry<List<String>, ListenableFuture<Void>> execution : executionMap.entrySet()) {
+ try {
+ execution.getValue().get(timeOutSeconds, TimeUnit.SECONDS);
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ log.error("Error when getting the result of registration for table" + tableKey);
+ throw new RuntimeException(e);
+ }
}
+ executionMap.clear();
+ log.info("finish flushing table: " + tableKey);
}
- executionMap.clear();
- log.info("finish flushing table: " + tableKey);
}
public void write(GobblinMetadataChangeEvent gmce, Map<String, Collection<HiveSpec>> newSpecsMap,
@@ -131,7 +132,7 @@ public class HiveMetadataWriter implements MetadataWriter {
//Calculate the topic name from gmce, fall back to topic.name in hive spec which can also be null
//todo: make topicName fall back to topic.name in hive spec so that we can also get schema for re-write operation
String topicName = null;
- if (gmce.getTopicPartitionOffsetsRange() != null) {
+ if (gmce.getTopicPartitionOffsetsRange() != null && !gmce.getTopicPartitionOffsetsRange().isEmpty()) {
String topicPartitionString = gmce.getTopicPartitionOffsetsRange().keySet().iterator().next();
//In case the topic name is not the table name or the topic name contains '-'
topicName = topicPartitionString.substring(0, topicPartitionString.lastIndexOf('-'));
@@ -278,7 +279,7 @@ public class HiveMetadataWriter implements MetadataWriter {
if (whiteistBlacklist.acceptTable(tableSpec.getTable().getDbName(), tableSpec.getTable().getTableName())) {
write(gmce, newSpecsMap, oldSpecsMap, tableSpec);
} else {
- log.info(String.format("Skip table %s.%s since it's blacklisted", tableSpec.getTable().getDbName(),
+ log.debug(String.format("Skip table %s.%s since it's blacklisted", tableSpec.getTable().getDbName(),
tableSpec.getTable().getTableName()));
}
}