You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2020/03/27 18:34:27 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-1100] Set average fetch time in the KafkaExtractor even when…

This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 569f260  [GOBBLIN-1100] Set average fetch time in the KafkaExtractor even when…
569f260 is described below

commit 569f260ad23325a393b4b8a273951e907c09d5d3
Author: Hung Tran <hu...@linkedin.com>
AuthorDate: Fri Mar 27 11:34:20 2020 -0700

    [GOBBLIN-1100] Set average fetch time in the KafkaExtractor even when…
    
    Closes #2941 from htran1/kafka-missing-avg-fetch-
    time
---
 .../extractor/extract/kafka/KafkaExtractor.java    | 10 +++++-
 .../extract/kafka/KafkaExtractorStatsTracker.java  | 25 ++-------------
 .../kafka/KafkaExtractorStatsTrackerTest.java      | 37 +++++++++++++++++++---
 3 files changed, 44 insertions(+), 28 deletions(-)

diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
index 4055fb0..2006a57 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.source.extractor.extract.kafka;
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -28,6 +29,7 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 
 import lombok.Getter;
 
@@ -319,8 +321,14 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
     this.workUnitState.setProp(ConfigurationKeys.ERROR_PARTITION_COUNT, this.statsTracker.getErrorPartitionCount());
     this.workUnitState.setProp(ConfigurationKeys.ERROR_MESSAGE_UNDECODABLE_COUNT, this.statsTracker.getUndecodableMessageCount());
     this.workUnitState.setActualHighWatermark(this.nextWatermark);
+
+    // Need to call this even when not emitting metrics because some state, such as the average pull time,
+    // is updated when the tags are generated
+    Map<KafkaPartition, Map<String, String>> tagsForPartitionsMap = this.statsTracker.generateTagsForPartitions(
+        this.lowWatermark, this.highWatermark, this.nextWatermark, Maps.newHashMap());
+
     if (isInstrumentationEnabled()) {
-      this.statsTracker.emitTrackingEvents(getMetricContext(), this.lowWatermark, this.highWatermark, this.nextWatermark);
+      this.statsTracker.emitTrackingEvents(getMetricContext(), tagsForPartitionsMap);
     }
   }
 
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
index a1f4520..2913b54 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
@@ -396,30 +396,9 @@ public class KafkaExtractorStatsTracker {
   /**
    * Emit Tracking events reporting the various statistics to be consumed by a monitoring application.
    * @param context the current {@link MetricContext}
-   * @param lowWatermark begin Kafka offset for each topic partition
-   * @param highWatermark the expected last Kafka offset for each topic partition to be consumed by the Extractor
-   * @param nextWatermark the offset of next valid message for each Kafka topic partition consumed by the Extractor
+   * @param tagsForPartitionsMap tags for each partition
    */
-  public void emitTrackingEvents(MetricContext context, MultiLongWatermark lowWatermark, MultiLongWatermark highWatermark,
-      MultiLongWatermark nextWatermark) {
-    emitTrackingEventsWithAdditionalTags(context, lowWatermark, highWatermark, nextWatermark, Maps.newHashMap());
-  }
-
-  /**
-   * Emit Tracking events reporting the various statistics to be consumed by a monitoring application, with additional
-   * map representing tags beyond what are constructed in {@link #createTagsForPartition(int, MultiLongWatermark, MultiLongWatermark, MultiLongWatermark) }
-   *
-   * Choose to not to make createTagsForPartition extensible to avoid additional derived class just for additional k-v pairs
-   * in the tag maps.
-   *
-   * @param additionalTags caller-provided mapping from {@link KafkaPartition} to {@link Map<String, String>}, which will
-   *                       be merged with result of {@link #createTagsForPartition}.
-   */
-  public void emitTrackingEventsWithAdditionalTags(MetricContext context, MultiLongWatermark lowWatermark, MultiLongWatermark highWatermark,
-      MultiLongWatermark nextWatermark, Map<KafkaPartition, Map<String, String>> additionalTags) {
-    Map<KafkaPartition, Map<String, String>> tagsForPartitionsMap =
-        generateTagsForPartitions(lowWatermark, highWatermark, nextWatermark, additionalTags);
-
+  public void emitTrackingEvents(MetricContext context, Map<KafkaPartition, Map<String, String>> tagsForPartitionsMap) {
     for (Map.Entry<KafkaPartition, Map<String, String>> eventTags : tagsForPartitionsMap.entrySet()) {
       EventSubmitter.Builder eventSubmitterBuilder = new EventSubmitter.Builder(context, GOBBLIN_KAFKA_NAMESPACE);
       eventSubmitterBuilder.addMetadata(this.taskEventMetadataGenerator.getMetadata(workUnitState, KAFKA_EXTRACTOR_TOPIC_METADATA_EVENT_NAME));
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java
index 278ff9b..1f959f2 100644
--- a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java
@@ -33,10 +33,11 @@ import com.google.common.collect.ImmutableMap;
 
 import org.apache.gobblin.configuration.WorkUnitState;
 
-
+@Test(singleThreaded = true)
 public class KafkaExtractorStatsTrackerTest {
   List<KafkaPartition> kafkaPartitions = new ArrayList<>();
   private KafkaExtractorStatsTracker extractorStatsTracker;
+  private WorkUnitState workUnitState;
   final static KafkaPartition PARTITION0 =  new KafkaPartition.Builder().withTopicName("test-topic").withId(0).build();
   final static KafkaPartition PARTITION1 =  new KafkaPartition.Builder().withTopicName("test-topic").withId(1).build();
 
@@ -44,9 +45,9 @@ public class KafkaExtractorStatsTrackerTest {
   public void setUp() {
     kafkaPartitions.add(PARTITION0);
     kafkaPartitions.add(PARTITION1);
-    WorkUnitState workUnitState = new WorkUnitState();
-    workUnitState.setProp(KafkaSource.RECORD_LEVEL_SLA_MINUTES_KEY, 10L);
-    workUnitState.setProp(KafkaSource.OBSERVED_LATENCY_MEASUREMENT_ENABLED, true);
+    this.workUnitState = new WorkUnitState();
+    this.workUnitState.setProp(KafkaSource.RECORD_LEVEL_SLA_MINUTES_KEY, 10L);
+    this.workUnitState.setProp(KafkaSource.OBSERVED_LATENCY_MEASUREMENT_ENABLED, true);
     this.extractorStatsTracker = new KafkaExtractorStatsTracker(workUnitState, kafkaPartitions);
   }
 
@@ -195,8 +196,36 @@ public class KafkaExtractorStatsTrackerTest {
     MultiLongWatermark nextWatermark = new MultiLongWatermark(Arrays.asList(new Long(15), new Long(25)));
     Map<KafkaPartition, Map<String, String>> addtionalTags =
         ImmutableMap.of(PARTITION0, ImmutableMap.of("testKey", "testValue"));
+
+    this.workUnitState.removeProp(KafkaUtils.getPartitionPropName(KafkaSource.START_FETCH_EPOCH_TIME, 0));
+    this.workUnitState.removeProp(KafkaUtils.getPartitionPropName(KafkaSource.STOP_FETCH_EPOCH_TIME, 0));
+    KafkaUtils.setPartitionAvgRecordMillis(this.workUnitState, PARTITION0, 0);
+
+    KafkaExtractorStatsTracker.ExtractorStats extractorStats = this.extractorStatsTracker.getStatsMap()
+        .get(kafkaPartitions.get(0));
+
+    extractorStats.setStartFetchEpochTime(1000);
+    extractorStats.setStopFetchEpochTime(10000);
+    extractorStats.setAvgMillisPerRecord(10.1);
+
     Map<KafkaPartition, Map<String, String>> result =
         extractorStatsTracker.generateTagsForPartitions(lowWatermark, highWatermark, nextWatermark, addtionalTags);
+
+    // generateTagsForPartitions will set the following in the workUnitState
+    Assert.assertEquals(this.workUnitState.getPropAsLong(
+        KafkaUtils.getPartitionPropName(KafkaSource.START_FETCH_EPOCH_TIME, 0)),
+        extractorStats.getStartFetchEpochTime());
+    Assert.assertEquals(this.workUnitState.getPropAsLong(
+        KafkaUtils.getPartitionPropName(KafkaSource.STOP_FETCH_EPOCH_TIME, 0)),
+        extractorStats.getStopFetchEpochTime());
+    Assert.assertEquals(KafkaUtils.getPartitionAvgRecordMillis(this.workUnitState, PARTITION0),
+        extractorStats.getAvgMillisPerRecord());
+
+    // restore values since other tests check for them
+    extractorStats.setStartFetchEpochTime(0);
+    extractorStats.setStopFetchEpochTime(0);
+    extractorStats.setAvgMillisPerRecord(-1);
+
     Assert.assertTrue(result.get(PARTITION0).containsKey("testKey"));
     Assert.assertEquals(result.get(PARTITION0).get("testKey"), "testValue");
     Assert.assertFalse(result.get(PARTITION1).containsKey("testKey"));