You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2021/06/14 17:50:36 UTC

[gobblin] branch master updated: [GOBBLIN-1470] Commit watermarks to state store for quiet topics with no data[]

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new d3f4ea2  [GOBBLIN-1470] Commit watermarks to state store for quiet topics with no data[]
d3f4ea2 is described below

commit d3f4ea294933f6072130f8397ef827700d143993
Author: suvasude <su...@linkedin.biz>
AuthorDate: Mon Jun 14 10:50:29 2021 -0700

    [GOBBLIN-1470] Commit watermarks to state store for quiet topics with no data[]
    
    Closes #3310 from sv2000/quietTopicWatermark
---
 .../extract/kafka/KafkaProduceRateTracker.java     |  8 ++----
 .../extract/kafka/KafkaProduceRateTrackerTest.java | 33 ++++++++++++++++++++++
 .../extract/kafka/KafkaStreamingExtractorTest.java | 33 +++++++++++++---------
 3 files changed, 54 insertions(+), 20 deletions(-)

diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaProduceRateTracker.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaProduceRateTracker.java
index b3e67e6..ab88eec 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaProduceRateTracker.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaProduceRateTracker.java
@@ -256,14 +256,10 @@ public class KafkaProduceRateTracker {
           (KafkaStreamingExtractor.KafkaWatermark) lastCommittedWatermarks.get(partition.toString());
       KafkaStreamingExtractor.KafkaWatermark unacknowledgedWatermark =
           (KafkaStreamingExtractor.KafkaWatermark) unacknowledgedWatermarks.get(partition.toString());
-      if (unacknowledgedWatermark == null && kafkaWatermark == null) {
-        //If there is no previously committed watermark for the topic partition and no records in current time window, no further
-        //processing needed for the topic partition.
-        continue;
-      }
+
       if (kafkaWatermark == null) {
         //If there is no previously committed watermark for the topic partition, create a dummy watermark for computing stats
-        kafkaWatermark = new KafkaStreamingExtractor.KafkaWatermark(partition, new LongWatermark(0L));
+        kafkaWatermark = new KafkaStreamingExtractor.KafkaWatermark(partition, new LongWatermark(maxOffset >= 0? maxOffset : 0L));
       }
       long avgRecordSize = this.statsTracker.getAvgRecordSize(partitionIndex);
       long previousMaxOffset = highWatermark.get(partitionIndex++);
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaProduceRateTrackerTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaProduceRateTrackerTest.java
index 2dbc351..bf8d37f 100644
--- a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaProduceRateTrackerTest.java
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaProduceRateTrackerTest.java
@@ -127,6 +127,39 @@ public class KafkaProduceRateTrackerTest {
   }
 
   @Test
+  public void testWriteProduceRateToKafkaWatermarksNoData() {
+    long currentTime = System.currentTimeMillis();
+
+    WorkUnitState workUnitState = new WorkUnitState();
+    workUnitState.setProp(KafkaProduceRateTracker.KAFKA_PRODUCE_RATE_DISABLE_STATS_ON_HOLIDAYS_KEY, false);
+    workUnitState.setProp(FlushingExtractor.FLUSH_INTERVAL_SECONDS_KEY, 1L);
+    workUnitState.setProp(KafkaSource.RECORD_LEVEL_SLA_MINUTES_KEY, 5L);
+    WatermarkTracker watermarkTracker = new LastWatermarkTracker(false);
+    KafkaExtractorStatsTracker extractorStatsTracker = new KafkaExtractorStatsTracker(workUnitState, kafkaPartitions);
+
+    KafkaProduceRateTracker tracker =
+        new KafkaProduceRateTracker(workUnitState, kafkaPartitions, watermarkTracker, extractorStatsTracker, currentTime);
+
+    Map<KafkaPartition, Long> latestOffsetMap = Maps.newHashMap();
+    latestOffsetMap.put(kafkaPartitions.get(0), 20L);
+    latestOffsetMap.put(kafkaPartitions.get(1), 30L);
+    Map<String, CheckpointableWatermark> lastCommittedWatermarks = Maps.newHashMap();
+
+    //No new data; High watermark same as latest offsets
+    MultiLongWatermark highWatermark = new MultiLongWatermark(Lists.newArrayList(20L, 30L));
+    extractorStatsTracker.reset();
+    tracker.writeProduceRateToKafkaWatermarks(latestOffsetMap, lastCommittedWatermarks, highWatermark, currentTime);
+
+    Map<String, CheckpointableWatermark> unacknowledgedWatermarks = watermarkTracker.getAllUnacknowledgedWatermarks();
+    for (KafkaPartition topicPartition : kafkaPartitions) {
+      KafkaStreamingExtractor.KafkaWatermark kafkaWatermark = (KafkaStreamingExtractor.KafkaWatermark) unacknowledgedWatermarks.get(topicPartition.toString());
+      Assert.assertTrue(kafkaWatermark.avgProduceRates != null);
+      Assert.assertTrue(kafkaWatermark.avgConsumeRate < 0);
+      Assert.assertTrue(kafkaWatermark.getLwm().getValue() > 0);
+    }
+  }
+
+  @Test (dependsOnMethods = "testWriteProduceRateToKafkaWatermarksNoData")
   public void testWriteProduceRateToKafkaWatermarks() {
     long readStartTime = System.nanoTime();
     long decodeStartTime = readStartTime + 1;
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractorTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractorTest.java
index 1cf7ff2..927fe74 100644
--- a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractorTest.java
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractorTest.java
@@ -47,16 +47,21 @@ public class KafkaStreamingExtractorTest {
       throws IOException, DataRecordException {
     MultiLongWatermark highWatermark1 = new MultiLongWatermark(this.streamingExtractor.highWatermark);
 
-    //Read 2 records
+    //Read 3 records
     this.streamingExtractor.readStreamEntityImpl();
-    Assert.assertEquals( this.streamingExtractor.nextWatermark.get(0), 1L);
-    Assert.assertEquals( this.streamingExtractor.nextWatermark.get(1), 0L);
-    Assert.assertEquals( this.streamingExtractor.nextWatermark.get(2), 0L);
+    Assert.assertEquals(this.streamingExtractor.nextWatermark.get(0), 1L);
+    Assert.assertEquals(this.streamingExtractor.nextWatermark.get(1), 0L);
+    Assert.assertEquals(this.streamingExtractor.nextWatermark.get(2), 0L);
 
-    this.streamingExtractor.readStreamEntityImpl();
-    Assert.assertEquals( this.streamingExtractor.nextWatermark.get(0), 1L);
-    Assert.assertEquals( this.streamingExtractor.nextWatermark.get(1), 1L);
-    Assert.assertEquals( this.streamingExtractor.nextWatermark.get(2), 0L);
+    streamingExtractor.readStreamEntityImpl();
+    Assert.assertEquals(this.streamingExtractor.nextWatermark.get(0), 1L);
+    Assert.assertEquals(this.streamingExtractor.nextWatermark.get(1), 1L);
+    Assert.assertEquals(this.streamingExtractor.nextWatermark.get(2), 0L);
+
+    streamingExtractor.readStreamEntityImpl();
+    Assert.assertEquals(this.streamingExtractor.nextWatermark.get(0), 1L);
+    Assert.assertEquals(this.streamingExtractor.nextWatermark.get(1), 1L);
+    Assert.assertEquals(this.streamingExtractor.nextWatermark.get(2), 1L);
 
     //Checkpoint watermarks
     this.streamingExtractor.onFlushAck();
@@ -70,13 +75,13 @@ public class KafkaStreamingExtractorTest {
     MultiLongWatermark highWatermark2 = new MultiLongWatermark(this.streamingExtractor.highWatermark);
     //Read 1 more record
     this.streamingExtractor.readStreamEntityImpl();
-    Assert.assertEquals( this.streamingExtractor.nextWatermark.get(0), 1L);
-    Assert.assertEquals( this.streamingExtractor.nextWatermark.get(1), 1L);
-    Assert.assertEquals( this.streamingExtractor.nextWatermark.get(2), 1L);
+    Assert.assertEquals(this.streamingExtractor.nextWatermark.get(0), 2L);
+    Assert.assertEquals(this.streamingExtractor.nextWatermark.get(1), 1L);
+    Assert.assertEquals(this.streamingExtractor.nextWatermark.get(2), 1L);
 
-    Assert.assertEquals( this.streamingExtractor.lowWatermark.get(0), 1L);
-    Assert.assertEquals( this.streamingExtractor.lowWatermark.get(1), 1L);
-    Assert.assertEquals( this.streamingExtractor.lowWatermark.get(2), 0L);
+    Assert.assertEquals(this.streamingExtractor.lowWatermark.get(0), 1L);
+    Assert.assertEquals(this.streamingExtractor.lowWatermark.get(1), 1L);
+    Assert.assertEquals(this.streamingExtractor.lowWatermark.get(2), 1L);
 
     //Checkpoint watermarks
     this.streamingExtractor.onFlushAck();