You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2020/03/27 18:34:27 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-1100] Set average fetch time in the KafkaExtractor even when…
This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 569f260 [GOBBLIN-1100] Set average fetch time in the KafkaExtractor even when…
569f260 is described below
commit 569f260ad23325a393b4b8a273951e907c09d5d3
Author: Hung Tran <hu...@linkedin.com>
AuthorDate: Fri Mar 27 11:34:20 2020 -0700
[GOBBLIN-1100] Set average fetch time in the KafkaExtractor even when…
Closes #2941 from htran1/kafka-missing-avg-fetch-
time
---
.../extractor/extract/kafka/KafkaExtractor.java | 10 +++++-
.../extract/kafka/KafkaExtractorStatsTracker.java | 25 ++-------------
.../kafka/KafkaExtractorStatsTrackerTest.java | 37 +++++++++++++++++++---
3 files changed, 44 insertions(+), 28 deletions(-)
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
index 4055fb0..2006a57 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.source.extractor.extract.kafka;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -28,6 +29,7 @@ import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import lombok.Getter;
@@ -319,8 +321,14 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
this.workUnitState.setProp(ConfigurationKeys.ERROR_PARTITION_COUNT, this.statsTracker.getErrorPartitionCount());
this.workUnitState.setProp(ConfigurationKeys.ERROR_MESSAGE_UNDECODABLE_COUNT, this.statsTracker.getUndecodableMessageCount());
this.workUnitState.setActualHighWatermark(this.nextWatermark);
+
+ // Need to call this even when not emitting metrics because some state, such as the average pull time,
+ // is updated when the tags are generated
+ Map<KafkaPartition, Map<String, String>> tagsForPartitionsMap = this.statsTracker.generateTagsForPartitions(
+ this.lowWatermark, this.highWatermark, this.nextWatermark, Maps.newHashMap());
+
if (isInstrumentationEnabled()) {
- this.statsTracker.emitTrackingEvents(getMetricContext(), this.lowWatermark, this.highWatermark, this.nextWatermark);
+ this.statsTracker.emitTrackingEvents(getMetricContext(), tagsForPartitionsMap);
}
}
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
index a1f4520..2913b54 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
@@ -396,30 +396,9 @@ public class KafkaExtractorStatsTracker {
/**
* Emit Tracking events reporting the various statistics to be consumed by a monitoring application.
* @param context the current {@link MetricContext}
- * @param lowWatermark begin Kafka offset for each topic partition
- * @param highWatermark the expected last Kafka offset for each topic partition to be consumed by the Extractor
- * @param nextWatermark the offset of next valid message for each Kafka topic partition consumed by the Extractor
+ * @param tagsForPartitionsMap tags for each partition
*/
- public void emitTrackingEvents(MetricContext context, MultiLongWatermark lowWatermark, MultiLongWatermark highWatermark,
- MultiLongWatermark nextWatermark) {
- emitTrackingEventsWithAdditionalTags(context, lowWatermark, highWatermark, nextWatermark, Maps.newHashMap());
- }
-
- /**
- * Emit Tracking events reporting the various statistics to be consumed by a monitoring application, with additional
- * map representing tags beyond what are constructed in {@link #createTagsForPartition(int, MultiLongWatermark, MultiLongWatermark, MultiLongWatermark) }
- *
- * Choose to not to make createTagsForPartition extensible to avoid additional derived class just for additional k-v pairs
- * in the tag maps.
- *
- * @param additionalTags caller-provided mapping from {@link KafkaPartition} to {@link Map<String, String>}, which will
- * be merged with result of {@link #createTagsForPartition}.
- */
- public void emitTrackingEventsWithAdditionalTags(MetricContext context, MultiLongWatermark lowWatermark, MultiLongWatermark highWatermark,
- MultiLongWatermark nextWatermark, Map<KafkaPartition, Map<String, String>> additionalTags) {
- Map<KafkaPartition, Map<String, String>> tagsForPartitionsMap =
- generateTagsForPartitions(lowWatermark, highWatermark, nextWatermark, additionalTags);
-
+ public void emitTrackingEvents(MetricContext context, Map<KafkaPartition, Map<String, String>> tagsForPartitionsMap) {
for (Map.Entry<KafkaPartition, Map<String, String>> eventTags : tagsForPartitionsMap.entrySet()) {
EventSubmitter.Builder eventSubmitterBuilder = new EventSubmitter.Builder(context, GOBBLIN_KAFKA_NAMESPACE);
eventSubmitterBuilder.addMetadata(this.taskEventMetadataGenerator.getMetadata(workUnitState, KAFKA_EXTRACTOR_TOPIC_METADATA_EVENT_NAME));
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java
index 278ff9b..1f959f2 100644
--- a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java
@@ -33,10 +33,11 @@ import com.google.common.collect.ImmutableMap;
import org.apache.gobblin.configuration.WorkUnitState;
-
+@Test(singleThreaded = true)
public class KafkaExtractorStatsTrackerTest {
List<KafkaPartition> kafkaPartitions = new ArrayList<>();
private KafkaExtractorStatsTracker extractorStatsTracker;
+ private WorkUnitState workUnitState;
final static KafkaPartition PARTITION0 = new KafkaPartition.Builder().withTopicName("test-topic").withId(0).build();
final static KafkaPartition PARTITION1 = new KafkaPartition.Builder().withTopicName("test-topic").withId(1).build();
@@ -44,9 +45,9 @@ public class KafkaExtractorStatsTrackerTest {
public void setUp() {
kafkaPartitions.add(PARTITION0);
kafkaPartitions.add(PARTITION1);
- WorkUnitState workUnitState = new WorkUnitState();
- workUnitState.setProp(KafkaSource.RECORD_LEVEL_SLA_MINUTES_KEY, 10L);
- workUnitState.setProp(KafkaSource.OBSERVED_LATENCY_MEASUREMENT_ENABLED, true);
+ this.workUnitState = new WorkUnitState();
+ this.workUnitState.setProp(KafkaSource.RECORD_LEVEL_SLA_MINUTES_KEY, 10L);
+ this.workUnitState.setProp(KafkaSource.OBSERVED_LATENCY_MEASUREMENT_ENABLED, true);
this.extractorStatsTracker = new KafkaExtractorStatsTracker(workUnitState, kafkaPartitions);
}
@@ -195,8 +196,36 @@ public class KafkaExtractorStatsTrackerTest {
MultiLongWatermark nextWatermark = new MultiLongWatermark(Arrays.asList(new Long(15), new Long(25)));
Map<KafkaPartition, Map<String, String>> addtionalTags =
ImmutableMap.of(PARTITION0, ImmutableMap.of("testKey", "testValue"));
+
+ this.workUnitState.removeProp(KafkaUtils.getPartitionPropName(KafkaSource.START_FETCH_EPOCH_TIME, 0));
+ this.workUnitState.removeProp(KafkaUtils.getPartitionPropName(KafkaSource.STOP_FETCH_EPOCH_TIME, 0));
+ KafkaUtils.setPartitionAvgRecordMillis(this.workUnitState, PARTITION0, 0);
+
+ KafkaExtractorStatsTracker.ExtractorStats extractorStats = this.extractorStatsTracker.getStatsMap()
+ .get(kafkaPartitions.get(0));
+
+ extractorStats.setStartFetchEpochTime(1000);
+ extractorStats.setStopFetchEpochTime(10000);
+ extractorStats.setAvgMillisPerRecord(10.1);
+
Map<KafkaPartition, Map<String, String>> result =
extractorStatsTracker.generateTagsForPartitions(lowWatermark, highWatermark, nextWatermark, addtionalTags);
+
+ // generateTagsForPartitions will set the following in the workUnitState
+ Assert.assertEquals(this.workUnitState.getPropAsLong(
+ KafkaUtils.getPartitionPropName(KafkaSource.START_FETCH_EPOCH_TIME, 0)),
+ extractorStats.getStartFetchEpochTime());
+ Assert.assertEquals(this.workUnitState.getPropAsLong(
+ KafkaUtils.getPartitionPropName(KafkaSource.STOP_FETCH_EPOCH_TIME, 0)),
+ extractorStats.getStopFetchEpochTime());
+ Assert.assertEquals(KafkaUtils.getPartitionAvgRecordMillis(this.workUnitState, PARTITION0),
+ extractorStats.getAvgMillisPerRecord());
+
+ // restore values since other tests check for them
+ extractorStats.setStartFetchEpochTime(0);
+ extractorStats.setStopFetchEpochTime(0);
+ extractorStats.setAvgMillisPerRecord(-1);
+
Assert.assertTrue(result.get(PARTITION0).containsKey("testKey"));
Assert.assertEquals(result.get(PARTITION0).get("testKey"), "testValue");
Assert.assertFalse(result.get(PARTITION1).containsKey("testKey"));