You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/09/17 23:28:05 UTC
incubator-gobblin git commit: [GOBBLIN-589] Add undecodable message
count to Gobblin Kafka tracking event
Repository: incubator-gobblin
Updated Branches:
refs/heads/master fcc4d412a -> 8e974ef09
[GOBBLIN-589] Add undecodable message count to Gobblin Kafka tracking event
Closes #2457 from cshen98/metrics2
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/8e974ef0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/8e974ef0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/8e974ef0
Branch: refs/heads/master
Commit: 8e974ef094d2c3abb4da45a5d5806fda7ff52d0b
Parents: fcc4d41
Author: Carl Shen <ca...@gmail.com>
Authored: Mon Sep 17 16:27:59 2018 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Mon Sep 17 16:27:59 2018 -0700
----------------------------------------------------------------------
.../source/extractor/extract/kafka/KafkaExtractor.java | 12 ++++++++----
1 file changed, 8 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8e974ef0/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
index 664446f..b0d38b2 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
@@ -60,7 +60,7 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
private static final Logger LOG = LoggerFactory.getLogger(KafkaExtractor.class);
protected static final int INITIAL_PARTITION_IDX = -1;
- protected static final Integer MAX_LOG_DECODING_ERRORS = 5;
+ protected static final Long MAX_LOG_DECODING_ERRORS = 5L;
// Constants for event submission
public static final String TOPIC = "topic";
@@ -70,6 +70,7 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
public static final String EXPECTED_HIGH_WATERMARK = "expectedHighWatermark";
public static final String ELAPSED_TIME = "elapsedTime";
public static final String PROCESSED_RECORD_COUNT = "processedRecordCount";
+ public static final String UNDECODABLE_MESSAGE_COUNT = "undecodableMessageCount";
public static final String PARTITION_TOTAL_SIZE = "partitionTotalSize";
public static final String AVG_RECORD_PULL_TIME = "avgRecordPullTime";
public static final String READ_RECORD_TIME = "readRecordTime";
@@ -87,7 +88,7 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
protected final GobblinKafkaConsumerClient kafkaConsumerClient;
private final ClassAliasResolver<GobblinKafkaConsumerClientFactory> kafkaConsumerClientResolver;
- protected final Map<KafkaPartition, Integer> decodingErrorCount;
+ protected final Map<KafkaPartition, Long> decodingErrorCount;
private final Map<KafkaPartition, Double> avgMillisPerRecord;
private final Map<KafkaPartition, Long> avgRecordSizes;
private final Map<KafkaPartition, Long> elapsedTime;
@@ -233,8 +234,8 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
this.undecodableMessageCount++;
if (shouldLogError()) {
LOG.error(String.format("A record from partition %s cannot be decoded.", getCurrentPartition()), t);
- incrementErrorCount();
}
+ incrementErrorCount();
}
}
}
@@ -339,7 +340,7 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
if (this.decodingErrorCount.containsKey(getCurrentPartition())) {
this.decodingErrorCount.put(getCurrentPartition(), this.decodingErrorCount.get(getCurrentPartition()) + 1);
} else {
- this.decodingErrorCount.put(getCurrentPartition(), 1);
+ this.decodingErrorCount.put(getCurrentPartition(), 1L);
}
}
@@ -459,6 +460,9 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
tagsForPartition.put(READ_RECORD_TIME, "0");
}
+ tagsForPartition.put(UNDECODABLE_MESSAGE_COUNT,
+ Long.toString(this.decodingErrorCount.getOrDefault(partition, 0L)));
+
// Commit avg time to pull a record for each partition
if (this.avgMillisPerRecord.containsKey(partition)) {
double avgMillis = this.avgMillisPerRecord.get(partition);