You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2021/06/14 17:50:36 UTC
[gobblin] branch master updated: [GOBBLIN-1470] Commit watermarks
to state store for quiet topics with no data[]
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new d3f4ea2 [GOBBLIN-1470] Commit watermarks to state store for quiet topics with no data[]
d3f4ea2 is described below
commit d3f4ea294933f6072130f8397ef827700d143993
Author: suvasude <su...@linkedin.biz>
AuthorDate: Mon Jun 14 10:50:29 2021 -0700
[GOBBLIN-1470] Commit watermarks to state store for quiet topics with no data[]
Closes #3310 from sv2000/quietTopicWatermark
---
.../extract/kafka/KafkaProduceRateTracker.java | 8 ++----
.../extract/kafka/KafkaProduceRateTrackerTest.java | 33 ++++++++++++++++++++++
.../extract/kafka/KafkaStreamingExtractorTest.java | 33 +++++++++++++---------
3 files changed, 54 insertions(+), 20 deletions(-)
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaProduceRateTracker.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaProduceRateTracker.java
index b3e67e6..ab88eec 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaProduceRateTracker.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaProduceRateTracker.java
@@ -256,14 +256,10 @@ public class KafkaProduceRateTracker {
(KafkaStreamingExtractor.KafkaWatermark) lastCommittedWatermarks.get(partition.toString());
KafkaStreamingExtractor.KafkaWatermark unacknowledgedWatermark =
(KafkaStreamingExtractor.KafkaWatermark) unacknowledgedWatermarks.get(partition.toString());
- if (unacknowledgedWatermark == null && kafkaWatermark == null) {
- //If there is no previously committed watermark for the topic partition and no records in current time window, no further
- //processing needed for the topic partition.
- continue;
- }
+
if (kafkaWatermark == null) {
//If there is no previously committed watermark for the topic partition, create a dummy watermark for computing stats
- kafkaWatermark = new KafkaStreamingExtractor.KafkaWatermark(partition, new LongWatermark(0L));
+ kafkaWatermark = new KafkaStreamingExtractor.KafkaWatermark(partition, new LongWatermark(maxOffset >= 0? maxOffset : 0L));
}
long avgRecordSize = this.statsTracker.getAvgRecordSize(partitionIndex);
long previousMaxOffset = highWatermark.get(partitionIndex++);
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaProduceRateTrackerTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaProduceRateTrackerTest.java
index 2dbc351..bf8d37f 100644
--- a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaProduceRateTrackerTest.java
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaProduceRateTrackerTest.java
@@ -127,6 +127,39 @@ public class KafkaProduceRateTrackerTest {
}
@Test
+ public void testWriteProduceRateToKafkaWatermarksNoData() {
+ long currentTime = System.currentTimeMillis();
+
+ WorkUnitState workUnitState = new WorkUnitState();
+ workUnitState.setProp(KafkaProduceRateTracker.KAFKA_PRODUCE_RATE_DISABLE_STATS_ON_HOLIDAYS_KEY, false);
+ workUnitState.setProp(FlushingExtractor.FLUSH_INTERVAL_SECONDS_KEY, 1L);
+ workUnitState.setProp(KafkaSource.RECORD_LEVEL_SLA_MINUTES_KEY, 5L);
+ WatermarkTracker watermarkTracker = new LastWatermarkTracker(false);
+ KafkaExtractorStatsTracker extractorStatsTracker = new KafkaExtractorStatsTracker(workUnitState, kafkaPartitions);
+
+ KafkaProduceRateTracker tracker =
+ new KafkaProduceRateTracker(workUnitState, kafkaPartitions, watermarkTracker, extractorStatsTracker, currentTime);
+
+ Map<KafkaPartition, Long> latestOffsetMap = Maps.newHashMap();
+ latestOffsetMap.put(kafkaPartitions.get(0), 20L);
+ latestOffsetMap.put(kafkaPartitions.get(1), 30L);
+ Map<String, CheckpointableWatermark> lastCommittedWatermarks = Maps.newHashMap();
+
+ //No new data; High watermark same as latest offsets
+ MultiLongWatermark highWatermark = new MultiLongWatermark(Lists.newArrayList(20L, 30L));
+ extractorStatsTracker.reset();
+ tracker.writeProduceRateToKafkaWatermarks(latestOffsetMap, lastCommittedWatermarks, highWatermark, currentTime);
+
+ Map<String, CheckpointableWatermark> unacknowledgedWatermarks = watermarkTracker.getAllUnacknowledgedWatermarks();
+ for (KafkaPartition topicPartition : kafkaPartitions) {
+ KafkaStreamingExtractor.KafkaWatermark kafkaWatermark = (KafkaStreamingExtractor.KafkaWatermark) unacknowledgedWatermarks.get(topicPartition.toString());
+ Assert.assertTrue(kafkaWatermark.avgProduceRates != null);
+ Assert.assertTrue(kafkaWatermark.avgConsumeRate < 0);
+ Assert.assertTrue(kafkaWatermark.getLwm().getValue() > 0);
+ }
+ }
+
+ @Test (dependsOnMethods = "testWriteProduceRateToKafkaWatermarksNoData")
public void testWriteProduceRateToKafkaWatermarks() {
long readStartTime = System.nanoTime();
long decodeStartTime = readStartTime + 1;
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractorTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractorTest.java
index 1cf7ff2..927fe74 100644
--- a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractorTest.java
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractorTest.java
@@ -47,16 +47,21 @@ public class KafkaStreamingExtractorTest {
throws IOException, DataRecordException {
MultiLongWatermark highWatermark1 = new MultiLongWatermark(this.streamingExtractor.highWatermark);
- //Read 2 records
+ //Read 3 records
this.streamingExtractor.readStreamEntityImpl();
- Assert.assertEquals( this.streamingExtractor.nextWatermark.get(0), 1L);
- Assert.assertEquals( this.streamingExtractor.nextWatermark.get(1), 0L);
- Assert.assertEquals( this.streamingExtractor.nextWatermark.get(2), 0L);
+ Assert.assertEquals(this.streamingExtractor.nextWatermark.get(0), 1L);
+ Assert.assertEquals(this.streamingExtractor.nextWatermark.get(1), 0L);
+ Assert.assertEquals(this.streamingExtractor.nextWatermark.get(2), 0L);
- this.streamingExtractor.readStreamEntityImpl();
- Assert.assertEquals( this.streamingExtractor.nextWatermark.get(0), 1L);
- Assert.assertEquals( this.streamingExtractor.nextWatermark.get(1), 1L);
- Assert.assertEquals( this.streamingExtractor.nextWatermark.get(2), 0L);
+ streamingExtractor.readStreamEntityImpl();
+ Assert.assertEquals(this.streamingExtractor.nextWatermark.get(0), 1L);
+ Assert.assertEquals(this.streamingExtractor.nextWatermark.get(1), 1L);
+ Assert.assertEquals(this.streamingExtractor.nextWatermark.get(2), 0L);
+
+ streamingExtractor.readStreamEntityImpl();
+ Assert.assertEquals(this.streamingExtractor.nextWatermark.get(0), 1L);
+ Assert.assertEquals(this.streamingExtractor.nextWatermark.get(1), 1L);
+ Assert.assertEquals(this.streamingExtractor.nextWatermark.get(2), 1L);
//Checkpoint watermarks
this.streamingExtractor.onFlushAck();
@@ -70,13 +75,13 @@ public class KafkaStreamingExtractorTest {
MultiLongWatermark highWatermark2 = new MultiLongWatermark(this.streamingExtractor.highWatermark);
//Read 1 more record
this.streamingExtractor.readStreamEntityImpl();
- Assert.assertEquals( this.streamingExtractor.nextWatermark.get(0), 1L);
- Assert.assertEquals( this.streamingExtractor.nextWatermark.get(1), 1L);
- Assert.assertEquals( this.streamingExtractor.nextWatermark.get(2), 1L);
+ Assert.assertEquals(this.streamingExtractor.nextWatermark.get(0), 2L);
+ Assert.assertEquals(this.streamingExtractor.nextWatermark.get(1), 1L);
+ Assert.assertEquals(this.streamingExtractor.nextWatermark.get(2), 1L);
- Assert.assertEquals( this.streamingExtractor.lowWatermark.get(0), 1L);
- Assert.assertEquals( this.streamingExtractor.lowWatermark.get(1), 1L);
- Assert.assertEquals( this.streamingExtractor.lowWatermark.get(2), 0L);
+ Assert.assertEquals(this.streamingExtractor.lowWatermark.get(0), 1L);
+ Assert.assertEquals(this.streamingExtractor.lowWatermark.get(1), 1L);
+ Assert.assertEquals(this.streamingExtractor.lowWatermark.get(2), 1L);
//Checkpoint watermarks
this.streamingExtractor.onFlushAck();