You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ji...@apache.org on 2019/03/26 21:46:25 UTC

[incubator-druid] branch kinesis-compatibility created (now 1352ee1)

This is an automated email from the ASF dual-hosted git repository.

jihoonson pushed a change to branch kinesis-compatibility
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git.


      at 1352ee1  Support kinesis compatibility

This branch includes the following new commits:

     new 1352ee1  Support kinesis compatibility

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[incubator-druid] 01/01: Support kinesis compatibility

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jihoonson pushed a commit to branch kinesis-compatibility
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git

commit 1352ee1dbf69aa462fd2ac308774dd12bdc688a1
Author: Jihoon Son <ji...@apache.org>
AuthorDate: Tue Mar 26 10:50:16 2019 -0700

    Support kinesis compatibility
---
 .../indexing/kafka/KafkaIndexTaskIOConfig.java     |   8 +-
 .../indexing/kinesis/KinesisIndexTaskIOConfig.java | 117 +++++++++-
 .../indexing/kinesis/KinesisIOConfigTest.java      | 238 +++++++++++++++++++++
 .../indexing/kinesis/KinesisIndexTaskTest.java     |  60 +++---
 .../kinesis/supervisor/KinesisSupervisorTest.java  |   2 +-
 5 files changed, 386 insertions(+), 39 deletions(-)

diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java
index 2200e47..37366f3 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java
@@ -41,8 +41,10 @@ public class KafkaIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<Inte
       @JsonProperty("taskGroupId") @Nullable Integer taskGroupId, // can be null for backward compabitility
       @JsonProperty("baseSequenceName") String baseSequenceName,
       // startPartitions and endPartitions exist to be able to read old ioConfigs in metadata store
-      @JsonProperty("startPartitions") @Nullable SeekableStreamEndSequenceNumbers<Integer, Long> startPartitions,
-      @JsonProperty("endPartitions") @Nullable SeekableStreamEndSequenceNumbers<Integer, Long> endPartitions,
+      @JsonProperty("startPartitions") @Nullable
+      @Deprecated SeekableStreamEndSequenceNumbers<Integer, Long> startPartitions,
+      @JsonProperty("endPartitions") @Nullable
+      @Deprecated SeekableStreamEndSequenceNumbers<Integer, Long> endPartitions,
       // startSequenceNumbers and endSequenceNumbers must be set for new versions
       @JsonProperty("startSequenceNumbers")
       @Nullable SeekableStreamStartSequenceNumbers<Integer, Long> startSequenceNumbers,
@@ -115,6 +117,7 @@ public class KafkaIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<Inte
    * {@link SeekableStreamStartSequenceNumbers} didn't exist before.
    */
   @JsonProperty
+  @Deprecated
   public SeekableStreamEndSequenceNumbers<Integer, Long> getStartPartitions()
   {
     // Converting to start sequence numbers. This is allowed for Kafka because the start offset is always inclusive.
@@ -130,6 +133,7 @@ public class KafkaIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<Inte
    * old version of Druid.
    */
   @JsonProperty
+  @Deprecated
   public SeekableStreamEndSequenceNumbers<Integer, Long> getEndPartitions()
   {
     return getEndSequenceNumbers();
diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskIOConfig.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskIOConfig.java
index f312dd6..e726ae2 100644
--- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskIOConfig.java
+++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskIOConfig.java
@@ -28,6 +28,7 @@ import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbe
 import org.joda.time.DateTime;
 
 import javax.annotation.Nullable;
+import java.util.Set;
 
 public class KinesisIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<String, String>
 {
@@ -46,6 +47,17 @@ public class KinesisIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<St
   public KinesisIndexTaskIOConfig(
       @JsonProperty("taskGroupId") @Nullable Integer taskGroupId,
       @JsonProperty("baseSequenceName") String baseSequenceName,
+      // below three deprecated variables exist to be able to read old ioConfigs in metadata store
+      @JsonProperty("startPartitions")
+      @Nullable
+      @Deprecated SeekableStreamEndSequenceNumbers<String, String> startPartitions,
+      @JsonProperty("endPartitions")
+      @Nullable
+      @Deprecated SeekableStreamEndSequenceNumbers<String, String> endPartitions,
+      @JsonProperty("exclusiveStartSequenceNumberPartitions")
+      @Nullable
+      @Deprecated Set<String> exclusiveStartSequenceNumberPartitions,
+      // startSequenceNumbers and endSequenceNumbers must be set for new versions
       @JsonProperty("startSequenceNumbers") SeekableStreamStartSequenceNumbers<String, String> startSequenceNumbers,
       @JsonProperty("endSequenceNumbers") SeekableStreamEndSequenceNumbers<String, String> endSequenceNumbers,
       @JsonProperty("useTransaction") Boolean useTransaction,
@@ -62,17 +74,17 @@ public class KinesisIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<St
     super(
         taskGroupId,
         baseSequenceName,
-        startSequenceNumbers,
-        endSequenceNumbers,
+        getStartSequenceNumbers(startSequenceNumbers, startPartitions, exclusiveStartSequenceNumberPartitions),
+        endSequenceNumbers == null ? endPartitions : endSequenceNumbers,
         useTransaction,
         minimumMessageTime,
         maximumMessageTime
     );
     Preconditions.checkArgument(
-        endSequenceNumbers.getPartitionSequenceNumberMap()
-                          .values()
-                          .stream()
-                          .noneMatch(x -> x.equals(KinesisSequenceNumber.END_OF_SHARD_MARKER)),
+        getEndSequenceNumbers().getPartitionSequenceNumberMap()
+                               .values()
+                               .stream()
+                               .noneMatch(x -> x.equals(KinesisSequenceNumber.END_OF_SHARD_MARKER)),
         "End sequenceNumbers must not have the end of shard marker (EOS)"
     );
 
@@ -84,6 +96,99 @@ public class KinesisIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<St
     this.deaggregate = deaggregate;
   }
 
+  public KinesisIndexTaskIOConfig(
+      int taskGroupId,
+      String baseSequenceName,
+      SeekableStreamStartSequenceNumbers<String, String> startSequenceNumbers,
+      SeekableStreamEndSequenceNumbers<String, String> endSequenceNumbers,
+      Boolean useTransaction,
+      DateTime minimumMessageTime,
+      DateTime maximumMessageTime,
+      String endpoint,
+      Integer recordsPerFetch,
+      Integer fetchDelayMillis,
+      String awsAssumedRoleArn,
+      String awsExternalId,
+      boolean deaggregate
+  )
+  {
+    this(
+        taskGroupId,
+        baseSequenceName,
+        null,
+        null,
+        null,
+        startSequenceNumbers,
+        endSequenceNumbers,
+        useTransaction,
+        minimumMessageTime,
+        maximumMessageTime,
+        endpoint,
+        recordsPerFetch,
+        fetchDelayMillis,
+        awsAssumedRoleArn,
+        awsExternalId,
+        deaggregate
+    );
+  }
+
+  private static SeekableStreamStartSequenceNumbers<String, String> getStartSequenceNumbers(
+      @Nullable SeekableStreamStartSequenceNumbers<String, String> newStartSequenceNumbers,
+      @Nullable SeekableStreamEndSequenceNumbers<String, String> oldStartSequenceNumbers,
+      @Nullable Set<String> exclusiveStartSequenceNumberPartitions
+  )
+  {
+    if (newStartSequenceNumbers == null) {
+      Preconditions.checkNotNull(
+          oldStartSequenceNumbers,
+          "Either startSequenceNumbers or startPartitions shoulnd't be null"
+      );
+
+      return new SeekableStreamStartSequenceNumbers<>(
+          oldStartSequenceNumbers.getStream(),
+          oldStartSequenceNumbers.getPartitionSequenceNumberMap(),
+          exclusiveStartSequenceNumberPartitions
+      );
+    } else {
+      return newStartSequenceNumbers;
+    }
+  }
+
+  /**
+   * This method is for compatibilty so that newer version of KafkaIndexTaskIOConfig can be read by
+   * old version of Druid. Note that this method returns end sequence numbers instead of start. This is because
+   * {@link SeekableStreamStartSequenceNumbers} didn't exist before.
+   */
+  @JsonProperty
+  @Deprecated
+  public SeekableStreamEndSequenceNumbers<String, String> getStartPartitions()
+  {
+    // Converting to start sequence numbers. This is allowed for Kafka because the start offset is always inclusive.
+    final SeekableStreamStartSequenceNumbers<String, String> startSequenceNumbers = getStartSequenceNumbers();
+    return new SeekableStreamEndSequenceNumbers<>(
+        startSequenceNumbers.getStream(),
+        startSequenceNumbers.getPartitionSequenceNumberMap()
+    );
+  }
+
+  /**
+   * This method is for compatibilty so that newer version of KafkaIndexTaskIOConfig can be read by
+   * old version of Druid.
+   */
+  @JsonProperty
+  @Deprecated
+  public SeekableStreamEndSequenceNumbers<String, String> getEndPartitions()
+  {
+    return getEndSequenceNumbers();
+  }
+
+  @JsonProperty
+  @Deprecated
+  public Set<String> getExclusiveStartSequenceNumberPartitions()
+  {
+    return getStartSequenceNumbers().getExclusivePartitions();
+  }
+
   @JsonProperty
   public String getEndpoint()
   {
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIOConfigTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIOConfigTest.java
index e0c7790..22393a8 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIOConfigTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIOConfigTest.java
@@ -19,21 +19,31 @@
 
 package org.apache.druid.indexing.kinesis;
 
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.databind.JsonMappingException;
 import com.fasterxml.jackson.databind.Module;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
+import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.segment.indexing.IOConfig;
 import org.hamcrest.CoreMatchers;
+import org.joda.time.DateTime;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
+import javax.annotation.Nullable;
+import java.io.IOException;
 import java.util.Collections;
+import java.util.Set;
 
 public class KinesisIOConfigTest
 {
@@ -243,4 +253,232 @@ public class KinesisIOConfigTest
     exception.expectMessage(CoreMatchers.containsString("endpoint"));
     mapper.readValue(jsonStr, IOConfig.class);
   }
+
+  @Test
+  public void testDeserializeToOldIoConfig() throws IOException
+  {
+    final KinesisIndexTaskIOConfig currentConfig = new KinesisIndexTaskIOConfig(
+        0,
+        "baseSequenceName",
+        new SeekableStreamStartSequenceNumbers<>(
+            "stream",
+            ImmutableMap.of("1", "10L", "2", "5L"),
+            ImmutableSet.of("1")
+        ),
+        new SeekableStreamEndSequenceNumbers<>("stream", ImmutableMap.of("1", "20L", "2", "30L")),
+        true,
+        DateTimes.nowUtc(),
+        DateTimes.nowUtc(),
+        "endpoint",
+        1000,
+        2000,
+        "awsAssumedRoleArn",
+        "awsExternalId",
+        true
+    );
+
+    final byte[] json = mapper.writeValueAsBytes(currentConfig);
+    final ObjectMapper oldMapper = new DefaultObjectMapper();
+    oldMapper.registerSubtypes(new NamedType(OldKinesisIndexTaskIoConfig.class, "kinesis"));
+
+    final OldKinesisIndexTaskIoConfig oldConfig = (OldKinesisIndexTaskIoConfig) oldMapper.readValue(
+        json,
+        IOConfig.class
+    );
+
+    Assert.assertEquals(currentConfig.getBaseSequenceName(), oldConfig.getBaseSequenceName());
+    Assert.assertEquals(
+        currentConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap(),
+        oldConfig.getStartPartitions().getPartitionSequenceNumberMap()
+    );
+    Assert.assertEquals(
+        currentConfig.getStartSequenceNumbers().getExclusivePartitions(),
+        oldConfig.getExclusiveStartSequenceNumberPartitions()
+    );
+    Assert.assertEquals(currentConfig.getEndSequenceNumbers(), oldConfig.getEndPartitions());
+    Assert.assertEquals(currentConfig.isUseTransaction(), oldConfig.isUseTransaction());
+    Assert.assertEquals(currentConfig.getMinimumMessageTime(), oldConfig.getMinimumMessageTime());
+    Assert.assertEquals(currentConfig.getMaximumMessageTime(), oldConfig.getMaximumMessageTime());
+    Assert.assertEquals(currentConfig.getEndpoint(), oldConfig.getEndpoint());
+    Assert.assertEquals(currentConfig.getRecordsPerFetch(), oldConfig.getRecordsPerFetch());
+    Assert.assertEquals(currentConfig.getFetchDelayMillis(), oldConfig.getFetchDelayMillis());
+    Assert.assertEquals(currentConfig.getAwsAssumedRoleArn(), oldConfig.getAwsAssumedRoleArn());
+    Assert.assertEquals(currentConfig.getAwsExternalId(), oldConfig.getAwsExternalId());
+    Assert.assertEquals(currentConfig.isDeaggregate(), oldConfig.isDeaggregate());
+  }
+
+  @Test
+  public void testDeserializeFromOldIoConfig() throws IOException
+  {
+    final ObjectMapper oldMapper = new DefaultObjectMapper();
+    oldMapper.registerSubtypes(new NamedType(OldKinesisIndexTaskIoConfig.class, "kinesis"));
+
+    final OldKinesisIndexTaskIoConfig oldConfig = new OldKinesisIndexTaskIoConfig(
+        "baseSequenceName",
+        new SeekableStreamEndSequenceNumbers<>("stream", ImmutableMap.of("1", "10L", "2", "5L")),
+        new SeekableStreamEndSequenceNumbers<>("stream", ImmutableMap.of("1", "20L", "2", "30L")),
+        ImmutableSet.of("1"),
+        true,
+        DateTimes.nowUtc(),
+        DateTimes.nowUtc(),
+        "endpoint",
+        1000,
+        2000,
+        "awsAssumedRoleArn",
+        "awsExternalId",
+        true
+    );
+
+    final byte[] json = oldMapper.writeValueAsBytes(oldConfig);
+    final KinesisIndexTaskIOConfig currentConfig = (KinesisIndexTaskIOConfig) mapper.readValue(json, IOConfig.class);
+
+    Assert.assertNull(currentConfig.getTaskGroupId());
+    Assert.assertEquals(oldConfig.getBaseSequenceName(), currentConfig.getBaseSequenceName());
+    Assert.assertEquals(
+        oldConfig.getStartPartitions().getPartitionSequenceNumberMap(),
+        currentConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap()
+    );
+    Assert.assertEquals(
+        oldConfig.getExclusiveStartSequenceNumberPartitions(),
+        currentConfig.getStartSequenceNumbers().getExclusivePartitions()
+    );
+    Assert.assertEquals(oldConfig.getEndPartitions(), currentConfig.getEndSequenceNumbers());
+    Assert.assertEquals(oldConfig.isUseTransaction(), currentConfig.isUseTransaction());
+    Assert.assertEquals(oldConfig.getMinimumMessageTime(), currentConfig.getMinimumMessageTime());
+    Assert.assertEquals(oldConfig.getMaximumMessageTime(), currentConfig.getMaximumMessageTime());
+    Assert.assertEquals(oldConfig.getEndpoint(), currentConfig.getEndpoint());
+    Assert.assertEquals(oldConfig.getRecordsPerFetch(), currentConfig.getRecordsPerFetch());
+    Assert.assertEquals(oldConfig.getFetchDelayMillis(), currentConfig.getFetchDelayMillis());
+    Assert.assertEquals(oldConfig.getAwsAssumedRoleArn(), currentConfig.getAwsAssumedRoleArn());
+    Assert.assertEquals(oldConfig.getAwsExternalId(), currentConfig.getAwsExternalId());
+    Assert.assertEquals(oldConfig.isDeaggregate(), currentConfig.isDeaggregate());
+  }
+
+  private static class OldKinesisIndexTaskIoConfig implements IOConfig
+  {
+    private final String baseSequenceName;
+    private final SeekableStreamEndSequenceNumbers<String, String> startPartitions;
+    private final SeekableStreamEndSequenceNumbers<String, String> endPartitions;
+    private final Set<String> exclusiveStartSequenceNumberPartitions;
+    private final boolean useTransaction;
+    private final Optional<DateTime> minimumMessageTime;
+    private final Optional<DateTime> maximumMessageTime;
+    private final String endpoint;
+    private final Integer recordsPerFetch;
+    private final Integer fetchDelayMillis;
+
+    private final String awsAssumedRoleArn;
+    private final String awsExternalId;
+    private final boolean deaggregate;
+
+    @JsonCreator
+    private OldKinesisIndexTaskIoConfig(
+        @JsonProperty("baseSequenceName") String baseSequenceName,
+        @JsonProperty("startPartitions") @Nullable SeekableStreamEndSequenceNumbers<String, String> startPartitions,
+        @JsonProperty("endPartitions") @Nullable SeekableStreamEndSequenceNumbers<String, String> endPartitions,
+        @JsonProperty("exclusiveStartSequenceNumberPartitions") Set<String> exclusiveStartSequenceNumberPartitions,
+        @JsonProperty("useTransaction") Boolean useTransaction,
+        @JsonProperty("minimumMessageTime") DateTime minimumMessageTime,
+        @JsonProperty("maximumMessageTime") DateTime maximumMessageTime,
+        @JsonProperty("endpoint") String endpoint,
+        @JsonProperty("recordsPerFetch") Integer recordsPerFetch,
+        @JsonProperty("fetchDelayMillis") Integer fetchDelayMillis,
+        @JsonProperty("awsAssumedRoleArn") String awsAssumedRoleArn,
+        @JsonProperty("awsExternalId") String awsExternalId,
+        @JsonProperty("deaggregate") boolean deaggregate
+    )
+    {
+      this.baseSequenceName = baseSequenceName;
+      this.startPartitions = startPartitions;
+      this.endPartitions = endPartitions;
+      this.exclusiveStartSequenceNumberPartitions = exclusiveStartSequenceNumberPartitions;
+      this.useTransaction = useTransaction;
+      this.minimumMessageTime = Optional.fromNullable(minimumMessageTime);
+      this.maximumMessageTime = Optional.fromNullable(maximumMessageTime);
+      this.endpoint = endpoint;
+      this.recordsPerFetch = recordsPerFetch;
+      this.fetchDelayMillis = fetchDelayMillis;
+      this.awsAssumedRoleArn = awsAssumedRoleArn;
+      this.awsExternalId = awsExternalId;
+      this.deaggregate = deaggregate;
+    }
+
+    @JsonProperty
+    public String getBaseSequenceName()
+    {
+      return baseSequenceName;
+    }
+
+    @JsonProperty
+    public SeekableStreamEndSequenceNumbers<String, String> getStartPartitions()
+    {
+      return startPartitions;
+    }
+
+    @JsonProperty
+    public SeekableStreamEndSequenceNumbers<String, String> getEndPartitions()
+    {
+      return endPartitions;
+    }
+
+    @JsonProperty
+    public Set<String> getExclusiveStartSequenceNumberPartitions()
+    {
+      return exclusiveStartSequenceNumberPartitions;
+    }
+
+    @JsonProperty
+    public boolean isUseTransaction()
+    {
+      return useTransaction;
+    }
+
+    @JsonProperty
+    public Optional<DateTime> getMinimumMessageTime()
+    {
+      return minimumMessageTime;
+    }
+
+    @JsonProperty
+    public Optional<DateTime> getMaximumMessageTime()
+    {
+      return maximumMessageTime;
+    }
+
+    @JsonProperty
+    public String getEndpoint()
+    {
+      return endpoint;
+    }
+
+    @JsonProperty
+    public int getRecordsPerFetch()
+    {
+      return recordsPerFetch;
+    }
+
+    @JsonProperty
+    public int getFetchDelayMillis()
+    {
+      return fetchDelayMillis;
+    }
+
+    @JsonProperty
+    public String getAwsAssumedRoleArn()
+    {
+      return awsAssumedRoleArn;
+    }
+
+    @JsonProperty
+    public String getAwsExternalId()
+    {
+      return awsExternalId;
+    }
+
+    @JsonProperty
+    public boolean isDeaggregate()
+    {
+      return deaggregate;
+    }
+  }
 }
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
index bbdd2dd..969cc39 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
@@ -399,7 +399,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
     final KinesisIndexTask task = createTask(
         null,
         new KinesisIndexTaskIOConfig(
-            null,
+            0,
             "sequence0",
             new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()),
             new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "4")),
@@ -469,7 +469,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
     final KinesisIndexTask task = createTask(
         null,
         new KinesisIndexTaskIOConfig(
-            null,
+            0,
             "sequence0",
             new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId0, "0"), ImmutableSet.of()),
             new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId0, "1")),
@@ -557,7 +557,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
     final KinesisIndexTask task = createTask(
         null,
         new KinesisIndexTaskIOConfig(
-            null,
+            0,
             baseSequenceName,
             startPartitions,
             endPartitions,
@@ -683,7 +683,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
     final KinesisIndexTask task = createTask(
         null,
         new KinesisIndexTaskIOConfig(
-            null,
+            0,
             baseSequenceName,
             startPartitions,
             endPartitions,
@@ -795,7 +795,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
     final KinesisIndexTask task = createTask(
         null,
         new KinesisIndexTaskIOConfig(
-            null,
+            0,
             "sequence0",
             new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "0"), ImmutableSet.of()),
             new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "4")),
@@ -864,7 +864,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
     final KinesisIndexTask task = createTask(
         null,
         new KinesisIndexTaskIOConfig(
-            null,
+            0,
             "sequence0",
             new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "0"), ImmutableSet.of()),
             new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "4")),
@@ -944,7 +944,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
             )
         ),
         new KinesisIndexTaskIOConfig(
-            null,
+            0,
             "sequence0",
             new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "0"), ImmutableSet.of()),
             new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "4")),
@@ -1015,7 +1015,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
     final KinesisIndexTask task = createTask(
         null,
         new KinesisIndexTaskIOConfig(
-            null,
+            0,
             "sequence0",
             new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()),
             new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2")),
@@ -1071,7 +1071,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
     final KinesisIndexTask task = createTask(
         null,
         new KinesisIndexTaskIOConfig(
-            null,
+            0,
             "sequence0",
             new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()),
             new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "4")),
@@ -1140,7 +1140,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
     final KinesisIndexTask task = createTask(
         null,
         new KinesisIndexTaskIOConfig(
-            null,
+            0,
             "sequence0",
             new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()),
             new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "4")),
@@ -1209,7 +1209,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
     final KinesisIndexTask task = createTask(
         null,
         new KinesisIndexTaskIOConfig(
-            null,
+            0,
             "sequence0",
             new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()),
             new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "5")),
@@ -1268,7 +1268,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
     final KinesisIndexTask task = createTask(
         null,
         new KinesisIndexTaskIOConfig(
-            null,
+            0,
             "sequence0",
             new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()),
             new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "12")),
@@ -1366,7 +1366,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
     final KinesisIndexTask task = createTask(
         null,
         new KinesisIndexTaskIOConfig(
-            null,
+            0,
             "sequence0",
             new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()),
             new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "9")),
@@ -1448,7 +1448,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
     final KinesisIndexTask task1 = createTask(
         null,
         new KinesisIndexTaskIOConfig(
-            null,
+            0,
             "sequence0",
             new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()),
             new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "4")),
@@ -1466,7 +1466,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
     final KinesisIndexTask task2 = createTask(
         null,
         new KinesisIndexTaskIOConfig(
-            null,
+            0,
             "sequence0",
             new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()),
             new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "4")),
@@ -1540,7 +1540,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
     final KinesisIndexTask task1 = createTask(
         null,
         new KinesisIndexTaskIOConfig(
-            null,
+            0,
             "sequence0",
             new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()),
             new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "4")),
@@ -1558,7 +1558,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
     final KinesisIndexTask task2 = createTask(
         null,
         new KinesisIndexTaskIOConfig(
-            null,
+            1,
             "sequence1",
             new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "3"), ImmutableSet.of()),
             new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "9")),
@@ -1630,7 +1630,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
     final KinesisIndexTask task1 = createTask(
         null,
         new KinesisIndexTaskIOConfig(
-            null,
+            0,
             "sequence0",
             new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()),
             new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "4")),
@@ -1648,7 +1648,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
     final KinesisIndexTask task2 = createTask(
         null,
         new KinesisIndexTaskIOConfig(
-            null,
+            1,
             "sequence1",
             new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "3"), ImmutableSet.of()),
             new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "9")),
@@ -1724,7 +1724,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
     final KinesisIndexTask task = createTask(
         null,
         new KinesisIndexTaskIOConfig(
-            null,
+            0,
             "sequence1",
             new SeekableStreamStartSequenceNumbers<>(
                 stream,
@@ -1808,7 +1808,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
     final KinesisIndexTask task1 = createTask(
         null,
         new KinesisIndexTaskIOConfig(
-            null,
+            0,
             "sequence0",
             new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()),
             new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "4")),
@@ -1826,7 +1826,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
     final KinesisIndexTask task2 = createTask(
         null,
         new KinesisIndexTaskIOConfig(
-            null,
+            1,
             "sequence1",
             new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId0, "0"), ImmutableSet.of()),
             new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId0, "1")),
@@ -1901,7 +1901,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
     final KinesisIndexTask task1 = createTask(
         "task1",
         new KinesisIndexTaskIOConfig(
-            null,
+            0,
             "sequence0",
             new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()),
             new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "5")),
@@ -1950,7 +1950,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
     final KinesisIndexTask task2 = createTask(
         task1.getId(),
         new KinesisIndexTaskIOConfig(
-            null,
+            0,
             "sequence0",
             new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()),
             new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "5")),
@@ -2034,7 +2034,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
     final KinesisIndexTask task1 = createTask(
         "task1",
         new KinesisIndexTaskIOConfig(
-            null,
+            0,
             "sequence0",
             new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "0"), ImmutableSet.of()),
             new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "6")),
@@ -2095,7 +2095,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
     final KinesisIndexTask task2 = createTask(
         task1.getId(),
         new KinesisIndexTaskIOConfig(
-            null,
+            0,
             "sequence0",
             new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "0"), ImmutableSet.of()),
             new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "6")),
@@ -2160,7 +2160,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
     final KinesisIndexTask task = createTask(
         "task1",
         new KinesisIndexTaskIOConfig(
-            null,
+            0,
             "sequence0",
             new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()),
             new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "13")),
@@ -2283,7 +2283,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
         "task1",
         DATA_SCHEMA,
         new KinesisIndexTaskIOConfig(
-            null,
+            0,
             "sequence0",
             new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "0"), ImmutableSet.of()),
             new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "4")),
@@ -2380,7 +2380,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
         ImmutableMap.of(shardId1, "100") // simulating unlimited
     );
     final KinesisIndexTaskIOConfig ioConfig = new KinesisIndexTaskIOConfig(
-        null,
+        0,
         baseSequenceName,
         startPartitions,
         endPartitions,
@@ -2493,7 +2493,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
         "task1",
         DATA_SCHEMA,
         new KinesisIndexTaskIOConfig(
-            null,
+            0,
             "sequence0",
             new SeekableStreamStartSequenceNumbers<>(
                 stream,
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
index 73c1d46..4b138f6 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
@@ -3550,7 +3550,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
         getDataSchema(dataSource),
         tuningConfig,
         new KinesisIndexTaskIOConfig(
-            null,
+            0,
             "sequenceName-" + taskGroupId,
             startPartitions,
             endPartitions,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org