You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/12/04 17:46:37 UTC

incubator-gobblin git commit: [GOBBLIN-643] Fix NPE when closing KafkaExtractor

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master a6dfdc6d4 -> 65123a606


[GOBBLIN-643] Fix NPE when closing KafkaExtractor

Closes #2513 from jack-moseley/npe-fix


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/65123a60
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/65123a60
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/65123a60

Branch: refs/heads/master
Commit: 65123a60697b4df824fc361170feddef704b7202
Parents: a6dfdc6
Author: Jack Moseley <jm...@linkedin.com>
Authored: Tue Dec 4 09:46:40 2018 -0800
Committer: Hung Tran <hu...@linkedin.com>
Committed: Tue Dec 4 09:46:40 2018 -0800

----------------------------------------------------------------------
 .../source/extractor/extract/kafka/KafkaExtractor.java       | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/65123a60/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
index fb22f9d..e54a4b6 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
@@ -435,12 +435,12 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
         Long.toString(KafkaUtils.getPropAsLongFromSingleOrMultiWorkUnitState(this.workUnitState,
             KafkaSource.PREVIOUS_STOP_FETCH_EPOCH_TIME, partitionId)));
 
-    tagsForPartition.put(KafkaSource.START_FETCH_EPOCH_TIME, Long.toString(this.startFetchEpochTime.get(partition)));
-    tagsForPartition.put(KafkaSource.STOP_FETCH_EPOCH_TIME, Long.toString(this.stopFetchEpochTime.get(partition)));
+    tagsForPartition.put(KafkaSource.START_FETCH_EPOCH_TIME, Long.toString(this.startFetchEpochTime.getOrDefault(partition, 0L)));
+    tagsForPartition.put(KafkaSource.STOP_FETCH_EPOCH_TIME, Long.toString(this.stopFetchEpochTime.getOrDefault(partition, 0L)));
     this.workUnitState.setProp(KafkaUtils.getPartitionPropName(KafkaSource.START_FETCH_EPOCH_TIME, partitionId),
-        Long.toString(this.startFetchEpochTime.get(partition)));
+        Long.toString(this.startFetchEpochTime.getOrDefault(partition, 0L)));
     this.workUnitState.setProp(KafkaUtils.getPartitionPropName(KafkaSource.STOP_FETCH_EPOCH_TIME, partitionId),
-        Long.toString(this.stopFetchEpochTime.get(partition)));
+        Long.toString(this.stopFetchEpochTime.getOrDefault(partition, 0L)));
 
     if (this.processedRecordCount.containsKey(partition)) {
       tagsForPartition.put(PROCESSED_RECORD_COUNT, Long.toString(this.processedRecordCount.get(partition)));