You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/12/04 17:46:37 UTC
incubator-gobblin git commit: [GOBBLIN-643] Fix NPE when closing
KafkaExtractor
Repository: incubator-gobblin
Updated Branches:
refs/heads/master a6dfdc6d4 -> 65123a606
[GOBBLIN-643] Fix NPE when closing KafkaExtractor
Closes #2513 from jack-moseley/npe-fix
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/65123a60
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/65123a60
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/65123a60
Branch: refs/heads/master
Commit: 65123a60697b4df824fc361170feddef704b7202
Parents: a6dfdc6
Author: Jack Moseley <jm...@linkedin.com>
Authored: Tue Dec 4 09:46:40 2018 -0800
Committer: Hung Tran <hu...@linkedin.com>
Committed: Tue Dec 4 09:46:40 2018 -0800
----------------------------------------------------------------------
.../source/extractor/extract/kafka/KafkaExtractor.java | 8 ++++----
1 file changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/65123a60/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
index fb22f9d..e54a4b6 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
@@ -435,12 +435,12 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
Long.toString(KafkaUtils.getPropAsLongFromSingleOrMultiWorkUnitState(this.workUnitState,
KafkaSource.PREVIOUS_STOP_FETCH_EPOCH_TIME, partitionId)));
- tagsForPartition.put(KafkaSource.START_FETCH_EPOCH_TIME, Long.toString(this.startFetchEpochTime.get(partition)));
- tagsForPartition.put(KafkaSource.STOP_FETCH_EPOCH_TIME, Long.toString(this.stopFetchEpochTime.get(partition)));
+ tagsForPartition.put(KafkaSource.START_FETCH_EPOCH_TIME, Long.toString(this.startFetchEpochTime.getOrDefault(partition, 0L)));
+ tagsForPartition.put(KafkaSource.STOP_FETCH_EPOCH_TIME, Long.toString(this.stopFetchEpochTime.getOrDefault(partition, 0L)));
this.workUnitState.setProp(KafkaUtils.getPartitionPropName(KafkaSource.START_FETCH_EPOCH_TIME, partitionId),
- Long.toString(this.startFetchEpochTime.get(partition)));
+ Long.toString(this.startFetchEpochTime.getOrDefault(partition, 0L)));
this.workUnitState.setProp(KafkaUtils.getPartitionPropName(KafkaSource.STOP_FETCH_EPOCH_TIME, partitionId),
- Long.toString(this.stopFetchEpochTime.get(partition)));
+ Long.toString(this.stopFetchEpochTime.getOrDefault(partition, 0L)));
if (this.processedRecordCount.containsKey(partition)) {
tagsForPartition.put(PROCESSED_RECORD_COUNT, Long.toString(this.processedRecordCount.get(partition)));