You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ji...@apache.org on 2019/03/26 21:46:26 UTC

[incubator-druid] 01/01: Support kinesis compatibility

This is an automated email from the ASF dual-hosted git repository.

jihoonson pushed a commit to branch kinesis-compatibility
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git

commit 1352ee1dbf69aa462fd2ac308774dd12bdc688a1
Author: Jihoon Son <ji...@apache.org>
AuthorDate: Tue Mar 26 10:50:16 2019 -0700

    Support kinesis compatibility
---
 .../indexing/kafka/KafkaIndexTaskIOConfig.java     |   8 +-
 .../indexing/kinesis/KinesisIndexTaskIOConfig.java | 117 +++++++++-
 .../indexing/kinesis/KinesisIOConfigTest.java      | 238 +++++++++++++++++++++
 .../indexing/kinesis/KinesisIndexTaskTest.java     |  60 +++---
 .../kinesis/supervisor/KinesisSupervisorTest.java  |   2 +-
 5 files changed, 386 insertions(+), 39 deletions(-)

diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java
index 2200e47..37366f3 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java
@@ -41,8 +41,10 @@ public class KafkaIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<Inte
       @JsonProperty("taskGroupId") @Nullable Integer taskGroupId, // can be null for backward compabitility
       @JsonProperty("baseSequenceName") String baseSequenceName,
       // startPartitions and endPartitions exist to be able to read old ioConfigs in metadata store
-      @JsonProperty("startPartitions") @Nullable SeekableStreamEndSequenceNumbers<Integer, Long> startPartitions,
-      @JsonProperty("endPartitions") @Nullable SeekableStreamEndSequenceNumbers<Integer, Long> endPartitions,
+      @JsonProperty("startPartitions") @Nullable
+      @Deprecated SeekableStreamEndSequenceNumbers<Integer, Long> startPartitions,
+      @JsonProperty("endPartitions") @Nullable
+      @Deprecated SeekableStreamEndSequenceNumbers<Integer, Long> endPartitions,
       // startSequenceNumbers and endSequenceNumbers must be set for new versions
       @JsonProperty("startSequenceNumbers")
       @Nullable SeekableStreamStartSequenceNumbers<Integer, Long> startSequenceNumbers,
@@ -115,6 +117,7 @@ public class KafkaIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<Inte
    * {@link SeekableStreamStartSequenceNumbers} didn't exist before.
    */
   @JsonProperty
+  @Deprecated
   public SeekableStreamEndSequenceNumbers<Integer, Long> getStartPartitions()
   {
     // Converting to start sequence numbers. This is allowed for Kafka because the start offset is always inclusive.
@@ -130,6 +133,7 @@ public class KafkaIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<Inte
    * old version of Druid.
    */
   @JsonProperty
+  @Deprecated
   public SeekableStreamEndSequenceNumbers<Integer, Long> getEndPartitions()
   {
     return getEndSequenceNumbers();
diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskIOConfig.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskIOConfig.java
index f312dd6..e726ae2 100644
--- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskIOConfig.java
+++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskIOConfig.java
@@ -28,6 +28,7 @@ import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbe
 import org.joda.time.DateTime;
 
 import javax.annotation.Nullable;
+import java.util.Set;
 
 public class KinesisIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<String, String>
 {
@@ -46,6 +47,17 @@ public class KinesisIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<St
   public KinesisIndexTaskIOConfig(
       @JsonProperty("taskGroupId") @Nullable Integer taskGroupId,
       @JsonProperty("baseSequenceName") String baseSequenceName,
+      // below three deprecated variables exist to be able to read old ioConfigs in metadata store
+      @JsonProperty("startPartitions")
+      @Nullable
+      @Deprecated SeekableStreamEndSequenceNumbers<String, String> startPartitions,
+      @JsonProperty("endPartitions")
+      @Nullable
+      @Deprecated SeekableStreamEndSequenceNumbers<String, String> endPartitions,
+      @JsonProperty("exclusiveStartSequenceNumberPartitions")
+      @Nullable
+      @Deprecated Set<String> exclusiveStartSequenceNumberPartitions,
+      // startSequenceNumbers and endSequenceNumbers must be set for new versions
       @JsonProperty("startSequenceNumbers") SeekableStreamStartSequenceNumbers<String, String> startSequenceNumbers,
       @JsonProperty("endSequenceNumbers") SeekableStreamEndSequenceNumbers<String, String> endSequenceNumbers,
       @JsonProperty("useTransaction") Boolean useTransaction,
@@ -62,17 +74,17 @@ public class KinesisIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<St
     super(
         taskGroupId,
         baseSequenceName,
-        startSequenceNumbers,
-        endSequenceNumbers,
+        getStartSequenceNumbers(startSequenceNumbers, startPartitions, exclusiveStartSequenceNumberPartitions),
+        endSequenceNumbers == null ? endPartitions : endSequenceNumbers,
         useTransaction,
         minimumMessageTime,
         maximumMessageTime
     );
     Preconditions.checkArgument(
-        endSequenceNumbers.getPartitionSequenceNumberMap()
-                          .values()
-                          .stream()
-                          .noneMatch(x -> x.equals(KinesisSequenceNumber.END_OF_SHARD_MARKER)),
+        getEndSequenceNumbers().getPartitionSequenceNumberMap()
+                               .values()
+                               .stream()
+                               .noneMatch(x -> x.equals(KinesisSequenceNumber.END_OF_SHARD_MARKER)),
         "End sequenceNumbers must not have the end of shard marker (EOS)"
     );
 
@@ -84,6 +96,99 @@ public class KinesisIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<St
     this.deaggregate = deaggregate;
   }
 
+  public KinesisIndexTaskIOConfig(
+      int taskGroupId,
+      String baseSequenceName,
+      SeekableStreamStartSequenceNumbers<String, String> startSequenceNumbers,
+      SeekableStreamEndSequenceNumbers<String, String> endSequenceNumbers,
+      Boolean useTransaction,
+      DateTime minimumMessageTime,
+      DateTime maximumMessageTime,
+      String endpoint,
+      Integer recordsPerFetch,
+      Integer fetchDelayMillis,
+      String awsAssumedRoleArn,
+      String awsExternalId,
+      boolean deaggregate
+  )
+  {
+    this(
+        taskGroupId,
+        baseSequenceName,
+        null,
+        null,
+        null,
+        startSequenceNumbers,
+        endSequenceNumbers,
+        useTransaction,
+        minimumMessageTime,
+        maximumMessageTime,
+        endpoint,
+        recordsPerFetch,
+        fetchDelayMillis,
+        awsAssumedRoleArn,
+        awsExternalId,
+        deaggregate
+    );
+  }
+
+  private static SeekableStreamStartSequenceNumbers<String, String> getStartSequenceNumbers(
+      @Nullable SeekableStreamStartSequenceNumbers<String, String> newStartSequenceNumbers,
+      @Nullable SeekableStreamEndSequenceNumbers<String, String> oldStartSequenceNumbers,
+      @Nullable Set<String> exclusiveStartSequenceNumberPartitions
+  )
+  {
+    if (newStartSequenceNumbers == null) {
+      Preconditions.checkNotNull(
+          oldStartSequenceNumbers,
+          "Either startSequenceNumbers or startPartitions shoulnd't be null"
+      );
+
+      return new SeekableStreamStartSequenceNumbers<>(
+          oldStartSequenceNumbers.getStream(),
+          oldStartSequenceNumbers.getPartitionSequenceNumberMap(),
+          exclusiveStartSequenceNumberPartitions
+      );
+    } else {
+      return newStartSequenceNumbers;
+    }
+  }
+
+  /**
+   * This method is for compatibilty so that newer version of KafkaIndexTaskIOConfig can be read by
+   * old version of Druid. Note that this method returns end sequence numbers instead of start. This is because
+   * {@link SeekableStreamStartSequenceNumbers} didn't exist before.
+   */
+  @JsonProperty
+  @Deprecated
+  public SeekableStreamEndSequenceNumbers<String, String> getStartPartitions()
+  {
+    // Converting to start sequence numbers. This is allowed for Kafka because the start offset is always inclusive.
+    final SeekableStreamStartSequenceNumbers<String, String> startSequenceNumbers = getStartSequenceNumbers();
+    return new SeekableStreamEndSequenceNumbers<>(
+        startSequenceNumbers.getStream(),
+        startSequenceNumbers.getPartitionSequenceNumberMap()
+    );
+  }
+
+  /**
+   * This method is for compatibilty so that newer version of KafkaIndexTaskIOConfig can be read by
+   * old version of Druid.
+   */
+  @JsonProperty
+  @Deprecated
+  public SeekableStreamEndSequenceNumbers<String, String> getEndPartitions()
+  {
+    return getEndSequenceNumbers();
+  }
+
+  @JsonProperty
+  @Deprecated
+  public Set<String> getExclusiveStartSequenceNumberPartitions()
+  {
+    return getStartSequenceNumbers().getExclusivePartitions();
+  }
+
   @JsonProperty
   public String getEndpoint()
   {
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIOConfigTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIOConfigTest.java
index e0c7790..22393a8 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIOConfigTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIOConfigTest.java
@@ -19,21 +19,31 @@
 
 package org.apache.druid.indexing.kinesis;
 
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.databind.JsonMappingException;
 import com.fasterxml.jackson.databind.Module;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
+import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.segment.indexing.IOConfig;
 import org.hamcrest.CoreMatchers;
+import org.joda.time.DateTime;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
+import javax.annotation.Nullable;
+import java.io.IOException;
 import java.util.Collections;
+import java.util.Set;
 
 public class KinesisIOConfigTest
 {
@@ -243,4 +253,232 @@ public class KinesisIOConfigTest
     exception.expectMessage(CoreMatchers.containsString("endpoint"));
     mapper.readValue(jsonStr, IOConfig.class);
   }
+
+  @Test
+  public void testDeserializeToOldIoConfig() throws IOException
+  {
+    final KinesisIndexTaskIOConfig currentConfig = new KinesisIndexTaskIOConfig(
+        0,
+        "baseSequenceName",
+        new SeekableStreamStartSequenceNumbers<>(
+            "stream",
+            ImmutableMap.of("1", "10L", "2", "5L"),
+            ImmutableSet.of("1")
+        ),
+        new SeekableStreamEndSequenceNumbers<>("stream", ImmutableMap.of("1", "20L", "2", "30L")),
+        true,
+        DateTimes.nowUtc(),
+        DateTimes.nowUtc(),
+        "endpoint",
+        1000,
+        2000,
+        "awsAssumedRoleArn",
+        "awsExternalId",
+        true
+    );
+
+    final byte[] json = mapper.writeValueAsBytes(currentConfig);
+    final ObjectMapper oldMapper = new DefaultObjectMapper();
+    oldMapper.registerSubtypes(new NamedType(OldKinesisIndexTaskIoConfig.class, "kinesis"));
+
+    final OldKinesisIndexTaskIoConfig oldConfig = (OldKinesisIndexTaskIoConfig) oldMapper.readValue(
+        json,
+        IOConfig.class
+    );
+
+    Assert.assertEquals(currentConfig.getBaseSequenceName(), oldConfig.getBaseSequenceName());
+    Assert.assertEquals(
+        currentConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap(),
+        oldConfig.getStartPartitions().getPartitionSequenceNumberMap()
+    );
+    Assert.assertEquals(
+        currentConfig.getStartSequenceNumbers().getExclusivePartitions(),
+        oldConfig.getExclusiveStartSequenceNumberPartitions()
+    );
+    Assert.assertEquals(currentConfig.getEndSequenceNumbers(), oldConfig.getEndPartitions());
+    Assert.assertEquals(currentConfig.isUseTransaction(), oldConfig.isUseTransaction());
+    Assert.assertEquals(currentConfig.getMinimumMessageTime(), oldConfig.getMinimumMessageTime());
+    Assert.assertEquals(currentConfig.getMaximumMessageTime(), oldConfig.getMaximumMessageTime());
+    Assert.assertEquals(currentConfig.getEndpoint(), oldConfig.getEndpoint());
+    Assert.assertEquals(currentConfig.getRecordsPerFetch(), oldConfig.getRecordsPerFetch());
+    Assert.assertEquals(currentConfig.getFetchDelayMillis(), oldConfig.getFetchDelayMillis());
+    Assert.assertEquals(currentConfig.getAwsAssumedRoleArn(), oldConfig.getAwsAssumedRoleArn());
+    Assert.assertEquals(currentConfig.getAwsExternalId(), oldConfig.getAwsExternalId());
+    Assert.assertEquals(currentConfig.isDeaggregate(), oldConfig.isDeaggregate());
+  }
+
+  @Test
+  public void testDeserializeFromOldIoConfig() throws IOException
+  {
+    final ObjectMapper oldMapper = new DefaultObjectMapper();
+    oldMapper.registerSubtypes(new NamedType(OldKinesisIndexTaskIoConfig.class, "kinesis"));
+
+    final OldKinesisIndexTaskIoConfig oldConfig = new OldKinesisIndexTaskIoConfig(
+        "baseSequenceName",
+        new SeekableStreamEndSequenceNumbers<>("stream", ImmutableMap.of("1", "10L", "2", "5L")),
+        new SeekableStreamEndSequenceNumbers<>("stream", ImmutableMap.of("1", "20L", "2", "30L")),
+        ImmutableSet.of("1"),
+        true,
+        DateTimes.nowUtc(),
+        DateTimes.nowUtc(),
+        "endpoint",
+        1000,
+        2000,
+        "awsAssumedRoleArn",
+        "awsExternalId",
+        true
+    );
+
+    final byte[] json = oldMapper.writeValueAsBytes(oldConfig);
+    final KinesisIndexTaskIOConfig currentConfig = (KinesisIndexTaskIOConfig) mapper.readValue(json, IOConfig.class);
+
+    Assert.assertNull(currentConfig.getTaskGroupId());
+    Assert.assertEquals(oldConfig.getBaseSequenceName(), currentConfig.getBaseSequenceName());
+    Assert.assertEquals(
+        oldConfig.getStartPartitions().getPartitionSequenceNumberMap(),
+        currentConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap()
+    );
+    Assert.assertEquals(
+        oldConfig.getExclusiveStartSequenceNumberPartitions(),
+        currentConfig.getStartSequenceNumbers().getExclusivePartitions()
+    );
+    Assert.assertEquals(oldConfig.getEndPartitions(), currentConfig.getEndSequenceNumbers());
+    Assert.assertEquals(oldConfig.isUseTransaction(), currentConfig.isUseTransaction());
+    Assert.assertEquals(oldConfig.getMinimumMessageTime(), currentConfig.getMinimumMessageTime());
+    Assert.assertEquals(oldConfig.getMaximumMessageTime(), currentConfig.getMaximumMessageTime());
+    Assert.assertEquals(oldConfig.getEndpoint(), currentConfig.getEndpoint());
+    Assert.assertEquals(oldConfig.getRecordsPerFetch(), currentConfig.getRecordsPerFetch());
+    Assert.assertEquals(oldConfig.getFetchDelayMillis(), currentConfig.getFetchDelayMillis());
+    Assert.assertEquals(oldConfig.getAwsAssumedRoleArn(), currentConfig.getAwsAssumedRoleArn());
+    Assert.assertEquals(oldConfig.getAwsExternalId(), currentConfig.getAwsExternalId());
+    Assert.assertEquals(oldConfig.isDeaggregate(), currentConfig.isDeaggregate());
+  }
+
+  private static class OldKinesisIndexTaskIoConfig implements IOConfig
+  {
+    private final String baseSequenceName;
+    private final SeekableStreamEndSequenceNumbers<String, String> startPartitions;
+    private final SeekableStreamEndSequenceNumbers<String, String> endPartitions;
+    private final Set<String> exclusiveStartSequenceNumberPartitions;
+    private final boolean useTransaction;
+    private final Optional<DateTime> minimumMessageTime;
+    private final Optional<DateTime> maximumMessageTime;
+    private final String endpoint;
+    private final Integer recordsPerFetch;
+    private final Integer fetchDelayMillis;
+
+    private final String awsAssumedRoleArn;
+    private final String awsExternalId;
+    private final boolean deaggregate;
+
+    @JsonCreator
+    private OldKinesisIndexTaskIoConfig(
+        @JsonProperty("baseSequenceName") String baseSequenceName,
+        @JsonProperty("startPartitions") @Nullable SeekableStreamEndSequenceNumbers<String, String> startPartitions,
+        @JsonProperty("endPartitions") @Nullable SeekableStreamEndSequenceNumbers<String, String> endPartitions,
+        @JsonProperty("exclusiveStartSequenceNumberPartitions") Set<String> exclusiveStartSequenceNumberPartitions,
+        @JsonProperty("useTransaction") Boolean useTransaction,
+        @JsonProperty("minimumMessageTime") DateTime minimumMessageTime,
+        @JsonProperty("maximumMessageTime") DateTime maximumMessageTime,
+        @JsonProperty("endpoint") String endpoint,
+        @JsonProperty("recordsPerFetch") Integer recordsPerFetch,
+        @JsonProperty("fetchDelayMillis") Integer fetchDelayMillis,
+        @JsonProperty("awsAssumedRoleArn") String awsAssumedRoleArn,
+        @JsonProperty("awsExternalId") String awsExternalId,
+        @JsonProperty("deaggregate") boolean deaggregate
+    )
+    {
+      this.baseSequenceName = baseSequenceName;
+      this.startPartitions = startPartitions;
+      this.endPartitions = endPartitions;
+      this.exclusiveStartSequenceNumberPartitions = exclusiveStartSequenceNumberPartitions;
+      this.useTransaction = useTransaction;
+      this.minimumMessageTime = Optional.fromNullable(minimumMessageTime);
+      this.maximumMessageTime = Optional.fromNullable(maximumMessageTime);
+      this.endpoint = endpoint;
+      this.recordsPerFetch = recordsPerFetch;
+      this.fetchDelayMillis = fetchDelayMillis;
+      this.awsAssumedRoleArn = awsAssumedRoleArn;
+      this.awsExternalId = awsExternalId;
+      this.deaggregate = deaggregate;
+    }
+
+    @JsonProperty
+    public String getBaseSequenceName()
+    {
+      return baseSequenceName;
+    }
+
+    @JsonProperty
+    public SeekableStreamEndSequenceNumbers<String, String> getStartPartitions()
+    {
+      return startPartitions;
+    }
+
+    @JsonProperty
+    public SeekableStreamEndSequenceNumbers<String, String> getEndPartitions()
+    {
+      return endPartitions;
+    }
+
+    @JsonProperty
+    public Set<String> getExclusiveStartSequenceNumberPartitions()
+    {
+      return exclusiveStartSequenceNumberPartitions;
+    }
+
+    @JsonProperty
+    public boolean isUseTransaction()
+    {
+      return useTransaction;
+    }
+
+    @JsonProperty
+    public Optional<DateTime> getMinimumMessageTime()
+    {
+      return minimumMessageTime;
+    }
+
+    @JsonProperty
+    public Optional<DateTime> getMaximumMessageTime()
+    {
+      return maximumMessageTime;
+    }
+
+    @JsonProperty
+    public String getEndpoint()
+    {
+      return endpoint;
+    }
+
+    @JsonProperty
+    public int getRecordsPerFetch()
+    {
+      return recordsPerFetch;
+    }
+
+    @JsonProperty
+    public int getFetchDelayMillis()
+    {
+      return fetchDelayMillis;
+    }
+
+    @JsonProperty
+    public String getAwsAssumedRoleArn()
+    {
+      return awsAssumedRoleArn;
+    }
+
+    @JsonProperty
+    public String getAwsExternalId()
+    {
+      return awsExternalId;
+    }
+
+    @JsonProperty
+    public boolean isDeaggregate()
+    {
+      return deaggregate;
+    }
+  }
 }
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
index bbdd2dd..969cc39 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
@@ -399,7 +399,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
     final KinesisIndexTask task = createTask(
         null,
         new KinesisIndexTaskIOConfig(
-            null,
+            0,
             "sequence0",
             new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()),
             new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "4")),
@@ -469,7 +469,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
     final KinesisIndexTask task = createTask(
         null,
         new KinesisIndexTaskIOConfig(
-            null,
+            0,
             "sequence0",
             new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId0, "0"), ImmutableSet.of()),
             new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId0, "1")),
@@ -557,7 +557,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
     final KinesisIndexTask task = createTask(
         null,
         new KinesisIndexTaskIOConfig(
-            null,
+            0,
             baseSequenceName,
             startPartitions,
             endPartitions,
@@ -683,7 +683,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
     final KinesisIndexTask task = createTask(
         null,
         new KinesisIndexTaskIOConfig(
-            null,
+            0,
             baseSequenceName,
             startPartitions,
             endPartitions,
@@ -795,7 +795,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
     final KinesisIndexTask task = createTask(
         null,
         new KinesisIndexTaskIOConfig(
-            null,
+            0,
             "sequence0",
             new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "0"), ImmutableSet.of()),
             new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "4")),
@@ -864,7 +864,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
     final KinesisIndexTask task = createTask(
         null,
         new KinesisIndexTaskIOConfig(
-            null,
+            0,
             "sequence0",
             new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "0"), ImmutableSet.of()),
             new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "4")),
@@ -944,7 +944,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
             )
         ),
         new KinesisIndexTaskIOConfig(
-            null,
+            0,
             "sequence0",
             new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "0"), ImmutableSet.of()),
             new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "4")),
@@ -1015,7 +1015,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
     final KinesisIndexTask task = createTask(
         null,
         new KinesisIndexTaskIOConfig(
-            null,
+            0,
             "sequence0",
             new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()),
             new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2")),
@@ -1071,7 +1071,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
     final KinesisIndexTask task = createTask(
         null,
         new KinesisIndexTaskIOConfig(
-            null,
+            0,
             "sequence0",
             new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()),
             new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "4")),
@@ -1140,7 +1140,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
     final KinesisIndexTask task = createTask(
         null,
         new KinesisIndexTaskIOConfig(
-            null,
+            0,
             "sequence0",
             new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()),
             new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "4")),
@@ -1209,7 +1209,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
     final KinesisIndexTask task = createTask(
         null,
         new KinesisIndexTaskIOConfig(
-            null,
+            0,
             "sequence0",
             new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()),
             new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "5")),
@@ -1268,7 +1268,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
     final KinesisIndexTask task = createTask(
         null,
         new KinesisIndexTaskIOConfig(
-            null,
+            0,
             "sequence0",
             new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()),
             new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "12")),
@@ -1366,7 +1366,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
     final KinesisIndexTask task = createTask(
         null,
         new KinesisIndexTaskIOConfig(
-            null,
+            0,
             "sequence0",
             new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()),
             new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "9")),
@@ -1448,7 +1448,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
     final KinesisIndexTask task1 = createTask(
         null,
         new KinesisIndexTaskIOConfig(
-            null,
+            0,
             "sequence0",
             new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()),
             new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "4")),
@@ -1466,7 +1466,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
     final KinesisIndexTask task2 = createTask(
         null,
         new KinesisIndexTaskIOConfig(
-            null,
+            0,
             "sequence0",
             new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()),
             new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "4")),
@@ -1540,7 +1540,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
     final KinesisIndexTask task1 = createTask(
         null,
         new KinesisIndexTaskIOConfig(
-            null,
+            0,
             "sequence0",
             new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()),
             new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "4")),
@@ -1558,7 +1558,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
     final KinesisIndexTask task2 = createTask(
         null,
         new KinesisIndexTaskIOConfig(
-            null,
+            1,
             "sequence1",
             new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "3"), ImmutableSet.of()),
             new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "9")),
@@ -1630,7 +1630,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
     final KinesisIndexTask task1 = createTask(
         null,
         new KinesisIndexTaskIOConfig(
-            null,
+            0,
             "sequence0",
             new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()),
             new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "4")),
@@ -1648,7 +1648,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
     final KinesisIndexTask task2 = createTask(
         null,
         new KinesisIndexTaskIOConfig(
-            null,
+            1,
             "sequence1",
             new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "3"), ImmutableSet.of()),
             new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "9")),
@@ -1724,7 +1724,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
     final KinesisIndexTask task = createTask(
         null,
         new KinesisIndexTaskIOConfig(
-            null,
+            0,
             "sequence1",
             new SeekableStreamStartSequenceNumbers<>(
                 stream,
@@ -1808,7 +1808,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
     final KinesisIndexTask task1 = createTask(
         null,
         new KinesisIndexTaskIOConfig(
-            null,
+            0,
             "sequence0",
             new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()),
             new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "4")),
@@ -1826,7 +1826,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
     final KinesisIndexTask task2 = createTask(
         null,
         new KinesisIndexTaskIOConfig(
-            null,
+            1,
             "sequence1",
             new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId0, "0"), ImmutableSet.of()),
             new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId0, "1")),
@@ -1901,7 +1901,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
     final KinesisIndexTask task1 = createTask(
         "task1",
         new KinesisIndexTaskIOConfig(
-            null,
+            0,
             "sequence0",
             new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()),
             new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "5")),
@@ -1950,7 +1950,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
     final KinesisIndexTask task2 = createTask(
         task1.getId(),
         new KinesisIndexTaskIOConfig(
-            null,
+            0,
             "sequence0",
             new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()),
             new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "5")),
@@ -2034,7 +2034,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
     final KinesisIndexTask task1 = createTask(
         "task1",
         new KinesisIndexTaskIOConfig(
-            null,
+            0,
             "sequence0",
             new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "0"), ImmutableSet.of()),
             new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "6")),
@@ -2095,7 +2095,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
     final KinesisIndexTask task2 = createTask(
         task1.getId(),
         new KinesisIndexTaskIOConfig(
-            null,
+            0,
             "sequence0",
             new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "0"), ImmutableSet.of()),
             new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "6")),
@@ -2160,7 +2160,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
     final KinesisIndexTask task = createTask(
         "task1",
         new KinesisIndexTaskIOConfig(
-            null,
+            0,
             "sequence0",
             new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()),
             new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "13")),
@@ -2283,7 +2283,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
         "task1",
         DATA_SCHEMA,
         new KinesisIndexTaskIOConfig(
-            null,
+            0,
             "sequence0",
             new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "0"), ImmutableSet.of()),
             new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "4")),
@@ -2380,7 +2380,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
         ImmutableMap.of(shardId1, "100") // simulating unlimited
     );
     final KinesisIndexTaskIOConfig ioConfig = new KinesisIndexTaskIOConfig(
-        null,
+        0,
         baseSequenceName,
         startPartitions,
         endPartitions,
@@ -2493,7 +2493,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
         "task1",
         DATA_SCHEMA,
         new KinesisIndexTaskIOConfig(
-            null,
+            0,
             "sequence0",
             new SeekableStreamStartSequenceNumbers<>(
                 stream,
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
index 73c1d46..4b138f6 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
@@ -3550,7 +3550,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
         getDataSchema(dataSource),
         tuningConfig,
         new KinesisIndexTaskIOConfig(
-            null,
+            0,
             "sequenceName-" + taskGroupId,
             startPartitions,
             endPartitions,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org