You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/09/17 23:28:05 UTC

incubator-gobblin git commit: [GOBBLIN-589] Add undecodable message count to Gobblin Kafka tracking event

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master fcc4d412a -> 8e974ef09


[GOBBLIN-589] Add undecodable message count to Gobblin Kafka tracking event

Closes #2457 from cshen98/metrics2


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/8e974ef0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/8e974ef0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/8e974ef0

Branch: refs/heads/master
Commit: 8e974ef094d2c3abb4da45a5d5806fda7ff52d0b
Parents: fcc4d41
Author: Carl Shen <ca...@gmail.com>
Authored: Mon Sep 17 16:27:59 2018 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Mon Sep 17 16:27:59 2018 -0700

----------------------------------------------------------------------
 .../source/extractor/extract/kafka/KafkaExtractor.java  | 12 ++++++++----
 1 file changed, 8 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8e974ef0/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
index 664446f..b0d38b2 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
@@ -60,7 +60,7 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
   private static final Logger LOG = LoggerFactory.getLogger(KafkaExtractor.class);
 
   protected static final int INITIAL_PARTITION_IDX = -1;
-  protected static final Integer MAX_LOG_DECODING_ERRORS = 5;
+  protected static final Long MAX_LOG_DECODING_ERRORS = 5L;
 
   // Constants for event submission
   public static final String TOPIC = "topic";
@@ -70,6 +70,7 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
   public static final String EXPECTED_HIGH_WATERMARK = "expectedHighWatermark";
   public static final String ELAPSED_TIME = "elapsedTime";
   public static final String PROCESSED_RECORD_COUNT = "processedRecordCount";
+  public static final String UNDECODABLE_MESSAGE_COUNT = "undecodableMessageCount";
   public static final String PARTITION_TOTAL_SIZE = "partitionTotalSize";
   public static final String AVG_RECORD_PULL_TIME = "avgRecordPullTime";
   public static final String READ_RECORD_TIME = "readRecordTime";
@@ -87,7 +88,7 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
   protected final GobblinKafkaConsumerClient kafkaConsumerClient;
   private final ClassAliasResolver<GobblinKafkaConsumerClientFactory> kafkaConsumerClientResolver;
 
-  protected final Map<KafkaPartition, Integer> decodingErrorCount;
+  protected final Map<KafkaPartition, Long> decodingErrorCount;
   private final Map<KafkaPartition, Double> avgMillisPerRecord;
   private final Map<KafkaPartition, Long> avgRecordSizes;
   private final Map<KafkaPartition, Long> elapsedTime;
@@ -233,8 +234,8 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
           this.undecodableMessageCount++;
           if (shouldLogError()) {
             LOG.error(String.format("A record from partition %s cannot be decoded.", getCurrentPartition()), t);
-            incrementErrorCount();
           }
+          incrementErrorCount();
         }
       }
     }
@@ -339,7 +340,7 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
     if (this.decodingErrorCount.containsKey(getCurrentPartition())) {
       this.decodingErrorCount.put(getCurrentPartition(), this.decodingErrorCount.get(getCurrentPartition()) + 1);
     } else {
-      this.decodingErrorCount.put(getCurrentPartition(), 1);
+      this.decodingErrorCount.put(getCurrentPartition(), 1L);
     }
   }
 
@@ -459,6 +460,9 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
       tagsForPartition.put(READ_RECORD_TIME, "0");
     }
 
+    tagsForPartition.put(UNDECODABLE_MESSAGE_COUNT,
+        Long.toString(this.decodingErrorCount.getOrDefault(partition, 0L)));
+
     // Commit avg time to pull a record for each partition
     if (this.avgMillisPerRecord.containsKey(partition)) {
       double avgMillis = this.avgMillisPerRecord.get(partition);