You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by cw...@apache.org on 2019/03/14 00:50:08 UTC

[incubator-druid] branch 0.14.0-incubating updated: [Backport] Fix record validation in SeekableStreamIndexTaskRunner (#7261)

This is an automated email from the ASF dual-hosted git repository.

cwylie pushed a commit to branch 0.14.0-incubating
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/0.14.0-incubating by this push:
     new ca4ee7a  [Backport] Fix record validation in SeekableStreamIndexTaskRunner (#7261)
ca4ee7a is described below

commit ca4ee7a0ce23738944f2c76b65ea3f537f739634
Author: Jihoon Son <ji...@apache.org>
AuthorDate: Wed Mar 13 17:49:58 2019 -0700

    [Backport] Fix record validation in SeekableStreamIndexTaskRunner (#7261)
    
    * Fix record validation in SeekableStreamIndexTaskRunner
    
    * fix validation
---
 .../indexing/kafka/LegacyKafkaIndexTaskRunner.java |   2 +-
 .../druid/indexing/kafka/KafkaIndexTaskTest.java   |  42 ++++--
 .../indexing/kinesis/KinesisIndexTaskTest.java     | 152 ++++++++++++++++++++-
 .../seekablestream/SeekableStreamIndexTask.java    |   4 +-
 .../SeekableStreamIndexTaskRunner.java             |  61 +++++----
 5 files changed, 217 insertions(+), 44 deletions(-)

diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java
index d081a0e..b065361 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java
@@ -976,7 +976,7 @@ public class LegacyKafkaIndexTaskRunner extends SeekableStreamIndexTaskRunner<In
   }
 
   @Override
-  public Map<Integer, Long> getCurrentOffsets()
+  public ConcurrentMap<Integer, Long> getCurrentOffsets()
   {
     return nextOffsets;
   }
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index b7b3896..76433c2 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -215,8 +215,6 @@ public class KafkaIndexTaskTest
   private File reportsFile;
   private RowIngestionMetersFactory rowIngestionMetersFactory;
 
-  private int handoffCount = 0;
-
   // This should be removed in versions greater that 0.12.x
   // isIncrementalHandoffSupported should always be set to true in those later versions
   @Parameterized.Parameters(name = "isIncrementalHandoffSupported = {0}")
@@ -873,7 +871,14 @@ public class KafkaIndexTaskTest
         new ProducerRecord<>(topic, 0, null, JB("2011", "d", "y", "10", "20.0", "1.0")),
         new ProducerRecord<>(topic, 0, null, JB("2011", "D", "y", "10", "20.0", "1.0")),
         new ProducerRecord<>(topic, 0, null, JB("2012", "e", "y", "10", "20.0", "1.0")),
-        new ProducerRecord<>(topic, 0, null, JB("2009", "B", "y", "10", "20.0", "1.0"))
+        new ProducerRecord<>(topic, 0, null, JB("2009", "B", "y", "10", "20.0", "1.0")),
+        new ProducerRecord<>(topic, 0, null, JB("2008", "A", "x", "10", "20.0", "1.0")),
+        new ProducerRecord<>(topic, 0, null, JB("2009", "B", "x", "10", "20.0", "1.0")),
+        new ProducerRecord<>(topic, 0, null, JB("2010", "C", "x", "10", "20.0", "1.0")),
+        new ProducerRecord<>(topic, 0, null, JB("2011", "D", "x", "10", "20.0", "1.0")),
+        new ProducerRecord<>(topic, 0, null, JB("2011", "d", "x", "10", "20.0", "1.0")),
+        new ProducerRecord<>(topic, 0, null, JB("2012", "E", "x", "10", "20.0", "1.0")),
+        new ProducerRecord<>(topic, 0, null, JB("2009", "b", "x", "10", "20.0", "1.0"))
     );
 
     final String baseSequenceName = "sequence0";
@@ -897,9 +902,13 @@ public class KafkaIndexTaskTest
         topic,
         ImmutableMap.of(0, 5L)
     );
+    final SeekableStreamPartitions<Integer, Long> checkpoint2 = new SeekableStreamPartitions<>(
+        topic,
+        ImmutableMap.of(0, 12L)
+    );
     final SeekableStreamPartitions<Integer, Long> endPartitions = new SeekableStreamPartitions<>(
         topic,
-        ImmutableMap.of(0, 7L)
+        ImmutableMap.of(0, Long.MAX_VALUE)
     );
 
     final KafkaIndexTask task = createTask(
@@ -921,17 +930,28 @@ public class KafkaIndexTaskTest
     while (task.getRunner().getStatus() != Status.PAUSED) {
       Thread.sleep(10);
     }
-    final Map<Integer, Long> currentOffsets = ImmutableMap.copyOf(task.getRunner().getCurrentOffsets());
-    Assert.assertTrue(checkpoint1.getPartitionSequenceNumberMap().equals(currentOffsets));
+    Map<Integer, Long> currentOffsets = ImmutableMap.copyOf(task.getRunner().getCurrentOffsets());
+    Assert.assertEquals(checkpoint1.getPartitionSequenceNumberMap(), currentOffsets);
+
+    // Simulating the case when another replica has consumed up to the offset of 8
+    task.getRunner().setEndOffsets(ImmutableMap.of(0, 8L), false);
 
-    // actual checkpoint offset is 5, but simulating behavior of publishing set end offset call, to ensure this task
-    // will continue reading through the end offset of the checkpointed sequence
-    task.getRunner().setEndOffsets(ImmutableMap.of(0, 6L), true);
+    // The task is supposed to consume remaining rows up to the offset of 13
+    while (task.getRunner().getStatus() != Status.PAUSED) {
+      Thread.sleep(10);
+    }
+    currentOffsets = ImmutableMap.copyOf(task.getRunner().getCurrentOffsets());
+    Assert.assertEquals(checkpoint2.getPartitionSequenceNumberMap(), currentOffsets);
+
+    task.getRunner().setEndOffsets(
+        ImmutableMap.of(0, task.getRunner().getCurrentOffsets().get(0) + 1L),
+        true
+    );
 
     Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
 
-    // processed count would be 5 if it stopped at it's current offsets
-    Assert.assertEquals(6, task.getRunner().getRowIngestionMeters().getProcessed());
+    // processed count would be 8 if it stopped at it's current offsets
+    Assert.assertEquals(13, task.getRunner().getRowIngestionMeters().getProcessed());
     Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getUnparseable());
     Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway());
   }
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
index a8130b9..58fa72a 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
@@ -177,6 +177,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -640,7 +641,6 @@ public class KinesisIndexTaskTest extends EasyMockSupport
     Assert.assertEquals(ImmutableList.of("f"), readSegmentColumn("dim1", desc7));
   }
 
-
   @Test(timeout = 120_000L)
   public void testIncrementalHandOffMaxTotalRows() throws Exception
   {
@@ -2278,7 +2278,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
 
     verifyAll();
 
-    Map<String, String> currentOffsets = task.getRunner().getCurrentOffsets();
+    ConcurrentMap<String, String> currentOffsets = task.getRunner().getCurrentOffsets();
 
     try {
       future.get(10, TimeUnit.SECONDS);
@@ -2424,6 +2424,154 @@ public class KinesisIndexTaskTest extends EasyMockSupport
     Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentColumn("dim1", desc2));
   }
 
+  @Test(timeout = 5000L)
+  public void testIncrementalHandOffReadsThroughEndOffsets() throws Exception
+  {
+    final List<OrderedPartitionableRecord<String, String>> records = ImmutableList.of(
+        new OrderedPartitionableRecord<>(stream, "1", "0", JB("2008", "a", "y", "10", "20.0", "1.0")),
+        new OrderedPartitionableRecord<>(stream, "1", "1", JB("2009", "b", "y", "10", "20.0", "1.0")),
+        new OrderedPartitionableRecord<>(stream, "1", "2", JB("2010", "c", "y", "10", "20.0", "1.0")),
+        new OrderedPartitionableRecord<>(stream, "1", "3", JB("2011", "d", "y", "10", "20.0", "1.0")),
+        new OrderedPartitionableRecord<>(stream, "1", "4", JB("2011", "e", "y", "10", "20.0", "1.0")),
+        new OrderedPartitionableRecord<>(stream, "1", "5", JB("2012", "a", "y", "10", "20.0", "1.0")),
+        new OrderedPartitionableRecord<>(stream, "1", "6", JB("2013", "b", "y", "10", "20.0", "1.0")),
+        new OrderedPartitionableRecord<>(stream, "1", "7", JB("2010", "c", "y", "10", "20.0", "1.0")),
+        new OrderedPartitionableRecord<>(stream, "1", "8", JB("2011", "d", "y", "10", "20.0", "1.0")),
+        new OrderedPartitionableRecord<>(stream, "1", "9", JB("2011", "e", "y", "10", "20.0", "1.0")),
+        new OrderedPartitionableRecord<>(stream, "1", "10", JB("2008", "a", "y", "10", "20.0", "1.0")),
+        new OrderedPartitionableRecord<>(stream, "1", "11", JB("2009", "b", "y", "10", "20.0", "1.0")),
+        new OrderedPartitionableRecord<>(stream, "1", "12", JB("2010", "c", "y", "10", "20.0", "1.0")),
+        new OrderedPartitionableRecord<>(stream, "1", "13", JB("2012", "d", "y", "10", "20.0", "1.0")),
+        new OrderedPartitionableRecord<>(stream, "1", "14", JB("2013", "e", "y", "10", "20.0", "1.0"))
+    );
+
+    final String baseSequenceName = "sequence0";
+    // as soon as any segment has more than one record, incremental publishing should happen
+    maxRowsPerSegment = 2;
+    maxRecordsPerPoll = 1;
+
+    recordSupplier.assign(anyObject());
+    expectLastCall().anyTimes();
+
+    expect(recordSupplier.getEarliestSequenceNumber(anyObject())).andReturn("0").anyTimes();
+
+    recordSupplier.seek(anyObject(), anyString());
+    expectLastCall().anyTimes();
+
+    expect(recordSupplier.poll(anyLong())).andReturn(records.subList(0, 5))
+                                          .once()
+                                          .andReturn(records.subList(4, 10))
+                                          .once()
+                                          .andReturn(records.subList(9, 15))
+                                          .once();
+
+    recordSupplier.close();
+    expectLastCall().once();
+
+    replayAll();
+
+    final SeekableStreamPartitions<String, String> startPartitions = new SeekableStreamPartitions<>(
+        stream,
+        ImmutableMap.of(
+            shardId1,
+            "0"
+        )
+    );
+
+    final SeekableStreamPartitions<String, String> checkpoint1 = new SeekableStreamPartitions<>(
+        stream,
+        ImmutableMap.of(
+            shardId1,
+            "4"
+        )
+    );
+
+    final SeekableStreamPartitions<String, String> checkpoint2 = new SeekableStreamPartitions<>(
+        stream,
+        ImmutableMap.of(
+            shardId1,
+            "9"
+        )
+    );
+
+    final SeekableStreamPartitions<String, String> endPartitions = new SeekableStreamPartitions<>(
+        stream,
+        ImmutableMap.of(
+            shardId1,
+            "14"
+        )
+    );
+    final KinesisIndexTask task = createTask(
+        null,
+        new KinesisIndexTaskIOConfig(
+            null,
+            baseSequenceName,
+            startPartitions,
+            endPartitions,
+            true,
+            null,
+            null,
+            "awsEndpoint",
+            null,
+            null,
+            null,
+            null,
+            null,
+            false
+        )
+    );
+    final ListenableFuture<TaskStatus> future = runTask(task);
+    while (task.getRunner().getStatus() != SeekableStreamIndexTaskRunner.Status.PAUSED) {
+      Thread.sleep(10);
+    }
+    Map<String, String> currentOffsets = ImmutableMap.copyOf(task.getRunner().getCurrentOffsets());
+    Assert.assertEquals(checkpoint1.getPartitionSequenceNumberMap(), currentOffsets);
+    task.getRunner().setEndOffsets(currentOffsets, false);
+
+    // The task is supposed to consume remaining rows up to the offset of 13
+    while (task.getRunner().getStatus() != Status.PAUSED) {
+      Thread.sleep(10);
+    }
+    currentOffsets = ImmutableMap.copyOf(task.getRunner().getCurrentOffsets());
+    Assert.assertEquals(checkpoint2.getPartitionSequenceNumberMap(), currentOffsets);
+
+    task.getRunner().setEndOffsets(
+        ImmutableMap.of(shardId1, String.valueOf(Long.valueOf(task.getRunner().getCurrentOffsets().get(shardId1)) + 1)),
+        true
+    );
+
+    Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
+
+    verifyAll();
+
+    Assert.assertEquals(2, checkpointRequestsHash.size());
+
+    // Check metrics
+    Assert.assertEquals(12, task.getRunner().getRowIngestionMeters().getProcessed());
+    Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getUnparseable());
+    Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway());
+
+    // Check published metadata
+    final Set<SegmentDescriptor> descriptors = new HashSet<>();
+    descriptors.add(SD(task, "2008/P1D", 0));
+    descriptors.add(SD(task, "2008/P1D", 1));
+    descriptors.add(SD(task, "2009/P1D", 0));
+    descriptors.add(SD(task, "2010/P1D", 0));
+    descriptors.add(SD(task, "2010/P1D", 1));
+    descriptors.add(SD(task, "2011/P1D", 0));
+    descriptors.add(SD(task, "2011/P1D", 1));
+    descriptors.add(SD(task, "2012/P1D", 0));
+    descriptors.add(SD(task, "2013/P1D", 0));
+    Assert.assertEquals(descriptors, publishedDescriptors());
+    Assert.assertEquals(
+        new KinesisDataSourceMetadata(new SeekableStreamPartitions<>(stream, ImmutableMap.of(
+            shardId1,
+            "10"
+        ))),
+        metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource())
+    );
+  }
+
   private ListenableFuture<TaskStatus> runTask(final Task task)
   {
     try {
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
index 5219060..4a596e4 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
@@ -60,8 +60,8 @@ import javax.annotation.Nullable;
 import java.nio.ByteBuffer;
 import java.util.Map;
 
-public abstract class SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType> extends AbstractTask
-    implements ChatHandler
+public abstract class SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType>
+    extends AbstractTask implements ChatHandler
 {
   public static final long LOCK_ACQUIRE_TIMEOUT_SECONDS = 15;
   private static final EmittingLogger log = new EmittingLogger(SeekableStreamIndexTask.class);
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index c86a2b5..9c5352f 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -196,7 +196,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
   private final Set<String> publishingSequences = Sets.newConcurrentHashSet();
   private final List<ListenableFuture<SegmentsAndMetadata>> publishWaitList = new ArrayList<>();
   private final List<ListenableFuture<SegmentsAndMetadata>> handOffWaitList = new ArrayList<>();
-  private final Map<PartitionIdType, SequenceOffsetType> initialOffsetsSnapshot = new HashMap<>();
+  private final Set<PartitionIdType> initialOffsetsSnapshot = new HashSet<>();
   private final Set<PartitionIdType> exclusiveStartingPartitions = new HashSet<>();
 
   private volatile DateTime startTime;
@@ -454,7 +454,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
       status = Status.READING;
       Throwable caughtExceptionInner = null;
 
-      initialOffsetsSnapshot.putAll(currOffsets);
+      initialOffsetsSnapshot.addAll(currOffsets.keySet());
       exclusiveStartingPartitions.addAll(ioConfig.getExclusiveStartSequenceNumberPartitions());
 
       try {
@@ -490,7 +490,6 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
 
           maybePersistAndPublishSequences(committerSupplier);
 
-
           // calling getRecord() ensures that exceptions specific to kafka/kinesis like OffsetOutOfRangeException
           // are handled in the subclasses.
           List<OrderedPartitionableRecord<PartitionIdType, SequenceOffsetType>> records = getRecords(
@@ -511,9 +510,9 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
               continue;
             }
 
-            // for the first message we receive, check that we were given a message with a sequenceNumber that matches our
-            // expected starting sequenceNumber
-            if (!verifyInitialRecordAndSkipExclusivePartition(record, initialOffsetsSnapshot)) {
+            // for the first message we receive, check that we were given a message with a sequenceNumber that matches
+            // our expected starting sequenceNumber
+            if (!verifyInitialRecordAndSkipExclusivePartition(record)) {
               continue;
             }
 
@@ -1291,7 +1290,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
     return getCurrentOffsets();
   }
 
-  public Map<PartitionIdType, SequenceOffsetType> getCurrentOffsets()
+  public ConcurrentMap<PartitionIdType, SequenceOffsetType> getCurrentOffsets()
   {
     return currOffsets;
   }
@@ -1394,14 +1393,15 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
         // do not mark the starting sequence number as exclusive
         Set<PartitionIdType> exclusivePartitions = sequenceNumbers.keySet()
                                                                   .stream()
-                                                                  .filter(x -> !initialOffsetsSnapshot.containsKey(x)
+                                                                  .filter(x -> !initialOffsetsSnapshot.contains(x)
                                                                                || ioConfig.getExclusiveStartSequenceNumberPartitions()
                                                                                           .contains(x))
                                                                   .collect(Collectors.toSet());
 
-        if ((latestSequence.getStartOffsets().equals(sequenceNumbers) && latestSequence.exclusiveStartPartitions.equals(
-            exclusivePartitions) && !finish) ||
-            (latestSequence.getEndOffsets().equals(sequenceNumbers) && finish)) {
+        if ((latestSequence.getStartOffsets().equals(sequenceNumbers)
+             && latestSequence.exclusiveStartPartitions.equals(exclusivePartitions)
+             && !finish)
+            || (latestSequence.getEndOffsets().equals(sequenceNumbers) && finish)) {
           log.warn("Ignoring duplicate request, end sequences already set for sequences [%s]", sequenceNumbers);
           resume();
           return Response.ok(sequenceNumbers).build();
@@ -1452,7 +1452,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
               exclusivePartitions
           );
           sequences.add(newSequence);
-          initialOffsetsSnapshot.putAll(sequenceNumbers);
+          initialOffsetsSnapshot.addAll(sequenceNumbers.keySet());
         }
         persistSequences();
       }
@@ -1892,33 +1892,38 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
   }
 
   private boolean verifyInitialRecordAndSkipExclusivePartition(
-      final OrderedPartitionableRecord<PartitionIdType, SequenceOffsetType> record,
-      final Map<PartitionIdType, SequenceOffsetType> intialSequenceSnapshot
+      final OrderedPartitionableRecord<PartitionIdType, SequenceOffsetType> record
   )
   {
-    if (intialSequenceSnapshot.containsKey(record.getPartitionId())) {
-      if (!intialSequenceSnapshot.get(record.getPartitionId()).equals(record.getSequenceNumber())) {
+    // Check only for the first record among the record batch.
+    if (initialOffsetsSnapshot.contains(record.getPartitionId())) {
+      final SequenceOffsetType currOffset = Preconditions.checkNotNull(
+          currOffsets.get(record.getPartitionId()),
+          "Current offset is null for sequenceNumber[%s] and partitionId[%s]",
+          record.getSequenceNumber(),
+          record.getPartitionId()
+      );
+      final OrderedSequenceNumber<SequenceOffsetType> recordSequenceNumber = createSequenceNumber(
+          record.getSequenceNumber()
+      );
+      final OrderedSequenceNumber<SequenceOffsetType> currentSequenceNumber = createSequenceNumber(
+          currOffset
+      );
+      if (recordSequenceNumber.compareTo(currentSequenceNumber) < 0) {
         throw new ISE(
-            "Starting sequenceNumber [%s] does not match expected [%s] for partition [%s]",
+            "sequenceNumber of the start record[%s] is smaller than current sequenceNumber[%s] for partition[%s]",
             record.getSequenceNumber(),
-            intialSequenceSnapshot.get(record.getPartitionId()),
+            currOffset,
             record.getPartitionId()
         );
       }
 
-      log.info(
-          "Verified starting sequenceNumber [%s] for partition [%s]",
-          record.getSequenceNumber(), record.getPartitionId()
-      );
-
-      intialSequenceSnapshot.remove(record.getPartitionId());
-      if (intialSequenceSnapshot.isEmpty()) {
-        log.info("Verified starting sequences for all partitions");
-      }
+      // Remove the mark to notify that this partition has been read.
+      initialOffsetsSnapshot.remove(record.getPartitionId());
 
       // check exclusive starting sequence
       if (isStartingSequenceOffsetsExclusive() && exclusiveStartingPartitions.contains(record.getPartitionId())) {
-        log.info("Skipping starting sequenceNumber for partition [%s] marked exclusive", record.getPartitionId());
+        log.info("Skipping starting sequenceNumber for partition[%s] marked exclusive", record.getPartitionId());
 
         return false;
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org