You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ji...@apache.org on 2019/03/26 21:46:26 UTC
[incubator-druid] 01/01: Support kinesis compatibility
This is an automated email from the ASF dual-hosted git repository.
jihoonson pushed a commit to branch kinesis-compatibility
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
commit 1352ee1dbf69aa462fd2ac308774dd12bdc688a1
Author: Jihoon Son <ji...@apache.org>
AuthorDate: Tue Mar 26 10:50:16 2019 -0700
Support kinesis compatibility
---
.../indexing/kafka/KafkaIndexTaskIOConfig.java | 8 +-
.../indexing/kinesis/KinesisIndexTaskIOConfig.java | 117 +++++++++-
.../indexing/kinesis/KinesisIOConfigTest.java | 238 +++++++++++++++++++++
.../indexing/kinesis/KinesisIndexTaskTest.java | 60 +++---
.../kinesis/supervisor/KinesisSupervisorTest.java | 2 +-
5 files changed, 386 insertions(+), 39 deletions(-)
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java
index 2200e47..37366f3 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskIOConfig.java
@@ -41,8 +41,10 @@ public class KafkaIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<Inte
@JsonProperty("taskGroupId") @Nullable Integer taskGroupId, // can be null for backward compabitility
@JsonProperty("baseSequenceName") String baseSequenceName,
// startPartitions and endPartitions exist to be able to read old ioConfigs in metadata store
- @JsonProperty("startPartitions") @Nullable SeekableStreamEndSequenceNumbers<Integer, Long> startPartitions,
- @JsonProperty("endPartitions") @Nullable SeekableStreamEndSequenceNumbers<Integer, Long> endPartitions,
+ @JsonProperty("startPartitions") @Nullable
+ @Deprecated SeekableStreamEndSequenceNumbers<Integer, Long> startPartitions,
+ @JsonProperty("endPartitions") @Nullable
+ @Deprecated SeekableStreamEndSequenceNumbers<Integer, Long> endPartitions,
// startSequenceNumbers and endSequenceNumbers must be set for new versions
@JsonProperty("startSequenceNumbers")
@Nullable SeekableStreamStartSequenceNumbers<Integer, Long> startSequenceNumbers,
@@ -115,6 +117,7 @@ public class KafkaIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<Inte
* {@link SeekableStreamStartSequenceNumbers} didn't exist before.
*/
@JsonProperty
+ @Deprecated
public SeekableStreamEndSequenceNumbers<Integer, Long> getStartPartitions()
{
// Converting to start sequence numbers. This is allowed for Kafka because the start offset is always inclusive.
@@ -130,6 +133,7 @@ public class KafkaIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<Inte
* old version of Druid.
*/
@JsonProperty
+ @Deprecated
public SeekableStreamEndSequenceNumbers<Integer, Long> getEndPartitions()
{
return getEndSequenceNumbers();
diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskIOConfig.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskIOConfig.java
index f312dd6..e726ae2 100644
--- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskIOConfig.java
+++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskIOConfig.java
@@ -28,6 +28,7 @@ import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbe
import org.joda.time.DateTime;
import javax.annotation.Nullable;
+import java.util.Set;
public class KinesisIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<String, String>
{
@@ -46,6 +47,17 @@ public class KinesisIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<St
public KinesisIndexTaskIOConfig(
@JsonProperty("taskGroupId") @Nullable Integer taskGroupId,
@JsonProperty("baseSequenceName") String baseSequenceName,
+ // below three deprecated variables exist to be able to read old ioConfigs in metadata store
+ @JsonProperty("startPartitions")
+ @Nullable
+ @Deprecated SeekableStreamEndSequenceNumbers<String, String> startPartitions,
+ @JsonProperty("endPartitions")
+ @Nullable
+ @Deprecated SeekableStreamEndSequenceNumbers<String, String> endPartitions,
+ @JsonProperty("exclusiveStartSequenceNumberPartitions")
+ @Nullable
+ @Deprecated Set<String> exclusiveStartSequenceNumberPartitions,
+ // startSequenceNumbers and endSequenceNumbers must be set for new versions
@JsonProperty("startSequenceNumbers") SeekableStreamStartSequenceNumbers<String, String> startSequenceNumbers,
@JsonProperty("endSequenceNumbers") SeekableStreamEndSequenceNumbers<String, String> endSequenceNumbers,
@JsonProperty("useTransaction") Boolean useTransaction,
@@ -62,17 +74,17 @@ public class KinesisIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<St
super(
taskGroupId,
baseSequenceName,
- startSequenceNumbers,
- endSequenceNumbers,
+ getStartSequenceNumbers(startSequenceNumbers, startPartitions, exclusiveStartSequenceNumberPartitions),
+ endSequenceNumbers == null ? endPartitions : endSequenceNumbers,
useTransaction,
minimumMessageTime,
maximumMessageTime
);
Preconditions.checkArgument(
- endSequenceNumbers.getPartitionSequenceNumberMap()
- .values()
- .stream()
- .noneMatch(x -> x.equals(KinesisSequenceNumber.END_OF_SHARD_MARKER)),
+ getEndSequenceNumbers().getPartitionSequenceNumberMap()
+ .values()
+ .stream()
+ .noneMatch(x -> x.equals(KinesisSequenceNumber.END_OF_SHARD_MARKER)),
"End sequenceNumbers must not have the end of shard marker (EOS)"
);
@@ -84,6 +96,99 @@ public class KinesisIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<St
this.deaggregate = deaggregate;
}
+ public KinesisIndexTaskIOConfig(
+ int taskGroupId,
+ String baseSequenceName,
+ SeekableStreamStartSequenceNumbers<String, String> startSequenceNumbers,
+ SeekableStreamEndSequenceNumbers<String, String> endSequenceNumbers,
+ Boolean useTransaction,
+ DateTime minimumMessageTime,
+ DateTime maximumMessageTime,
+ String endpoint,
+ Integer recordsPerFetch,
+ Integer fetchDelayMillis,
+ String awsAssumedRoleArn,
+ String awsExternalId,
+ boolean deaggregate
+ )
+ {
+ this(
+ taskGroupId,
+ baseSequenceName,
+ null,
+ null,
+ null,
+ startSequenceNumbers,
+ endSequenceNumbers,
+ useTransaction,
+ minimumMessageTime,
+ maximumMessageTime,
+ endpoint,
+ recordsPerFetch,
+ fetchDelayMillis,
+ awsAssumedRoleArn,
+ awsExternalId,
+ deaggregate
+ );
+ }
+
+ private static SeekableStreamStartSequenceNumbers<String, String> getStartSequenceNumbers(
+ @Nullable SeekableStreamStartSequenceNumbers<String, String> newStartSequenceNumbers,
+ @Nullable SeekableStreamEndSequenceNumbers<String, String> oldStartSequenceNumbers,
+ @Nullable Set<String> exclusiveStartSequenceNumberPartitions
+ )
+ {
+ if (newStartSequenceNumbers == null) {
+ Preconditions.checkNotNull(
+ oldStartSequenceNumbers,
+ "Either startSequenceNumbers or startPartitions shoulnd't be null"
+ );
+
+ return new SeekableStreamStartSequenceNumbers<>(
+ oldStartSequenceNumbers.getStream(),
+ oldStartSequenceNumbers.getPartitionSequenceNumberMap(),
+ exclusiveStartSequenceNumberPartitions
+ );
+ } else {
+ return newStartSequenceNumbers;
+ }
+ }
+
+ /**
+ * This method is for compatibilty so that newer version of KafkaIndexTaskIOConfig can be read by
+ * old version of Druid. Note that this method returns end sequence numbers instead of start. This is because
+ * {@link SeekableStreamStartSequenceNumbers} didn't exist before.
+ */
+ @JsonProperty
+ @Deprecated
+ public SeekableStreamEndSequenceNumbers<String, String> getStartPartitions()
+ {
+ // Converting to start sequence numbers. This is allowed for Kafka because the start offset is always inclusive.
+ final SeekableStreamStartSequenceNumbers<String, String> startSequenceNumbers = getStartSequenceNumbers();
+ return new SeekableStreamEndSequenceNumbers<>(
+ startSequenceNumbers.getStream(),
+ startSequenceNumbers.getPartitionSequenceNumberMap()
+ );
+ }
+
+ /**
+ * This method is for compatibilty so that newer version of KafkaIndexTaskIOConfig can be read by
+ * old version of Druid.
+ */
+ @JsonProperty
+ @Deprecated
+ public SeekableStreamEndSequenceNumbers<String, String> getEndPartitions()
+ {
+ return getEndSequenceNumbers();
+ }
+
+ @JsonProperty
+ @Deprecated
+ public Set<String> getExclusiveStartSequenceNumberPartitions()
+ {
+ return getStartSequenceNumbers().getExclusivePartitions();
+ }
+
@JsonProperty
public String getEndpoint()
{
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIOConfigTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIOConfigTest.java
index e0c7790..22393a8 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIOConfigTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIOConfigTest.java
@@ -19,21 +19,31 @@
package org.apache.druid.indexing.kinesis;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
+import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.segment.indexing.IOConfig;
import org.hamcrest.CoreMatchers;
+import org.joda.time.DateTime;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
+import javax.annotation.Nullable;
+import java.io.IOException;
import java.util.Collections;
+import java.util.Set;
public class KinesisIOConfigTest
{
@@ -243,4 +253,232 @@ public class KinesisIOConfigTest
exception.expectMessage(CoreMatchers.containsString("endpoint"));
mapper.readValue(jsonStr, IOConfig.class);
}
+
+ @Test
+ public void testDeserializeToOldIoConfig() throws IOException
+ {
+ final KinesisIndexTaskIOConfig currentConfig = new KinesisIndexTaskIOConfig(
+ 0,
+ "baseSequenceName",
+ new SeekableStreamStartSequenceNumbers<>(
+ "stream",
+ ImmutableMap.of("1", "10L", "2", "5L"),
+ ImmutableSet.of("1")
+ ),
+ new SeekableStreamEndSequenceNumbers<>("stream", ImmutableMap.of("1", "20L", "2", "30L")),
+ true,
+ DateTimes.nowUtc(),
+ DateTimes.nowUtc(),
+ "endpoint",
+ 1000,
+ 2000,
+ "awsAssumedRoleArn",
+ "awsExternalId",
+ true
+ );
+
+ final byte[] json = mapper.writeValueAsBytes(currentConfig);
+ final ObjectMapper oldMapper = new DefaultObjectMapper();
+ oldMapper.registerSubtypes(new NamedType(OldKinesisIndexTaskIoConfig.class, "kinesis"));
+
+ final OldKinesisIndexTaskIoConfig oldConfig = (OldKinesisIndexTaskIoConfig) oldMapper.readValue(
+ json,
+ IOConfig.class
+ );
+
+ Assert.assertEquals(currentConfig.getBaseSequenceName(), oldConfig.getBaseSequenceName());
+ Assert.assertEquals(
+ currentConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap(),
+ oldConfig.getStartPartitions().getPartitionSequenceNumberMap()
+ );
+ Assert.assertEquals(
+ currentConfig.getStartSequenceNumbers().getExclusivePartitions(),
+ oldConfig.getExclusiveStartSequenceNumberPartitions()
+ );
+ Assert.assertEquals(currentConfig.getEndSequenceNumbers(), oldConfig.getEndPartitions());
+ Assert.assertEquals(currentConfig.isUseTransaction(), oldConfig.isUseTransaction());
+ Assert.assertEquals(currentConfig.getMinimumMessageTime(), oldConfig.getMinimumMessageTime());
+ Assert.assertEquals(currentConfig.getMaximumMessageTime(), oldConfig.getMaximumMessageTime());
+ Assert.assertEquals(currentConfig.getEndpoint(), oldConfig.getEndpoint());
+ Assert.assertEquals(currentConfig.getRecordsPerFetch(), oldConfig.getRecordsPerFetch());
+ Assert.assertEquals(currentConfig.getFetchDelayMillis(), oldConfig.getFetchDelayMillis());
+ Assert.assertEquals(currentConfig.getAwsAssumedRoleArn(), oldConfig.getAwsAssumedRoleArn());
+ Assert.assertEquals(currentConfig.getAwsExternalId(), oldConfig.getAwsExternalId());
+ Assert.assertEquals(currentConfig.isDeaggregate(), oldConfig.isDeaggregate());
+ }
+
+ @Test
+ public void testDeserializeFromOldIoConfig() throws IOException
+ {
+ final ObjectMapper oldMapper = new DefaultObjectMapper();
+ oldMapper.registerSubtypes(new NamedType(OldKinesisIndexTaskIoConfig.class, "kinesis"));
+
+ final OldKinesisIndexTaskIoConfig oldConfig = new OldKinesisIndexTaskIoConfig(
+ "baseSequenceName",
+ new SeekableStreamEndSequenceNumbers<>("stream", ImmutableMap.of("1", "10L", "2", "5L")),
+ new SeekableStreamEndSequenceNumbers<>("stream", ImmutableMap.of("1", "20L", "2", "30L")),
+ ImmutableSet.of("1"),
+ true,
+ DateTimes.nowUtc(),
+ DateTimes.nowUtc(),
+ "endpoint",
+ 1000,
+ 2000,
+ "awsAssumedRoleArn",
+ "awsExternalId",
+ true
+ );
+
+ final byte[] json = oldMapper.writeValueAsBytes(oldConfig);
+ final KinesisIndexTaskIOConfig currentConfig = (KinesisIndexTaskIOConfig) mapper.readValue(json, IOConfig.class);
+
+ Assert.assertNull(currentConfig.getTaskGroupId());
+ Assert.assertEquals(oldConfig.getBaseSequenceName(), currentConfig.getBaseSequenceName());
+ Assert.assertEquals(
+ oldConfig.getStartPartitions().getPartitionSequenceNumberMap(),
+ currentConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap()
+ );
+ Assert.assertEquals(
+ oldConfig.getExclusiveStartSequenceNumberPartitions(),
+ currentConfig.getStartSequenceNumbers().getExclusivePartitions()
+ );
+ Assert.assertEquals(oldConfig.getEndPartitions(), currentConfig.getEndSequenceNumbers());
+ Assert.assertEquals(oldConfig.isUseTransaction(), currentConfig.isUseTransaction());
+ Assert.assertEquals(oldConfig.getMinimumMessageTime(), currentConfig.getMinimumMessageTime());
+ Assert.assertEquals(oldConfig.getMaximumMessageTime(), currentConfig.getMaximumMessageTime());
+ Assert.assertEquals(oldConfig.getEndpoint(), currentConfig.getEndpoint());
+ Assert.assertEquals(oldConfig.getRecordsPerFetch(), currentConfig.getRecordsPerFetch());
+ Assert.assertEquals(oldConfig.getFetchDelayMillis(), currentConfig.getFetchDelayMillis());
+ Assert.assertEquals(oldConfig.getAwsAssumedRoleArn(), currentConfig.getAwsAssumedRoleArn());
+ Assert.assertEquals(oldConfig.getAwsExternalId(), currentConfig.getAwsExternalId());
+ Assert.assertEquals(oldConfig.isDeaggregate(), currentConfig.isDeaggregate());
+ }
+
+ private static class OldKinesisIndexTaskIoConfig implements IOConfig
+ {
+ private final String baseSequenceName;
+ private final SeekableStreamEndSequenceNumbers<String, String> startPartitions;
+ private final SeekableStreamEndSequenceNumbers<String, String> endPartitions;
+ private final Set<String> exclusiveStartSequenceNumberPartitions;
+ private final boolean useTransaction;
+ private final Optional<DateTime> minimumMessageTime;
+ private final Optional<DateTime> maximumMessageTime;
+ private final String endpoint;
+ private final Integer recordsPerFetch;
+ private final Integer fetchDelayMillis;
+
+ private final String awsAssumedRoleArn;
+ private final String awsExternalId;
+ private final boolean deaggregate;
+
+ @JsonCreator
+ private OldKinesisIndexTaskIoConfig(
+ @JsonProperty("baseSequenceName") String baseSequenceName,
+ @JsonProperty("startPartitions") @Nullable SeekableStreamEndSequenceNumbers<String, String> startPartitions,
+ @JsonProperty("endPartitions") @Nullable SeekableStreamEndSequenceNumbers<String, String> endPartitions,
+ @JsonProperty("exclusiveStartSequenceNumberPartitions") Set<String> exclusiveStartSequenceNumberPartitions,
+ @JsonProperty("useTransaction") Boolean useTransaction,
+ @JsonProperty("minimumMessageTime") DateTime minimumMessageTime,
+ @JsonProperty("maximumMessageTime") DateTime maximumMessageTime,
+ @JsonProperty("endpoint") String endpoint,
+ @JsonProperty("recordsPerFetch") Integer recordsPerFetch,
+ @JsonProperty("fetchDelayMillis") Integer fetchDelayMillis,
+ @JsonProperty("awsAssumedRoleArn") String awsAssumedRoleArn,
+ @JsonProperty("awsExternalId") String awsExternalId,
+ @JsonProperty("deaggregate") boolean deaggregate
+ )
+ {
+ this.baseSequenceName = baseSequenceName;
+ this.startPartitions = startPartitions;
+ this.endPartitions = endPartitions;
+ this.exclusiveStartSequenceNumberPartitions = exclusiveStartSequenceNumberPartitions;
+ this.useTransaction = useTransaction;
+ this.minimumMessageTime = Optional.fromNullable(minimumMessageTime);
+ this.maximumMessageTime = Optional.fromNullable(maximumMessageTime);
+ this.endpoint = endpoint;
+ this.recordsPerFetch = recordsPerFetch;
+ this.fetchDelayMillis = fetchDelayMillis;
+ this.awsAssumedRoleArn = awsAssumedRoleArn;
+ this.awsExternalId = awsExternalId;
+ this.deaggregate = deaggregate;
+ }
+
+ @JsonProperty
+ public String getBaseSequenceName()
+ {
+ return baseSequenceName;
+ }
+
+ @JsonProperty
+ public SeekableStreamEndSequenceNumbers<String, String> getStartPartitions()
+ {
+ return startPartitions;
+ }
+
+ @JsonProperty
+ public SeekableStreamEndSequenceNumbers<String, String> getEndPartitions()
+ {
+ return endPartitions;
+ }
+
+ @JsonProperty
+ public Set<String> getExclusiveStartSequenceNumberPartitions()
+ {
+ return exclusiveStartSequenceNumberPartitions;
+ }
+
+ @JsonProperty
+ public boolean isUseTransaction()
+ {
+ return useTransaction;
+ }
+
+ @JsonProperty
+ public Optional<DateTime> getMinimumMessageTime()
+ {
+ return minimumMessageTime;
+ }
+
+ @JsonProperty
+ public Optional<DateTime> getMaximumMessageTime()
+ {
+ return maximumMessageTime;
+ }
+
+ @JsonProperty
+ public String getEndpoint()
+ {
+ return endpoint;
+ }
+
+ @JsonProperty
+ public int getRecordsPerFetch()
+ {
+ return recordsPerFetch;
+ }
+
+ @JsonProperty
+ public int getFetchDelayMillis()
+ {
+ return fetchDelayMillis;
+ }
+
+ @JsonProperty
+ public String getAwsAssumedRoleArn()
+ {
+ return awsAssumedRoleArn;
+ }
+
+ @JsonProperty
+ public String getAwsExternalId()
+ {
+ return awsExternalId;
+ }
+
+ @JsonProperty
+ public boolean isDeaggregate()
+ {
+ return deaggregate;
+ }
+ }
}
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
index bbdd2dd..969cc39 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
@@ -399,7 +399,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
final KinesisIndexTask task = createTask(
null,
new KinesisIndexTaskIOConfig(
- null,
+ 0,
"sequence0",
new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "4")),
@@ -469,7 +469,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
final KinesisIndexTask task = createTask(
null,
new KinesisIndexTaskIOConfig(
- null,
+ 0,
"sequence0",
new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId0, "0"), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId0, "1")),
@@ -557,7 +557,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
final KinesisIndexTask task = createTask(
null,
new KinesisIndexTaskIOConfig(
- null,
+ 0,
baseSequenceName,
startPartitions,
endPartitions,
@@ -683,7 +683,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
final KinesisIndexTask task = createTask(
null,
new KinesisIndexTaskIOConfig(
- null,
+ 0,
baseSequenceName,
startPartitions,
endPartitions,
@@ -795,7 +795,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
final KinesisIndexTask task = createTask(
null,
new KinesisIndexTaskIOConfig(
- null,
+ 0,
"sequence0",
new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "0"), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "4")),
@@ -864,7 +864,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
final KinesisIndexTask task = createTask(
null,
new KinesisIndexTaskIOConfig(
- null,
+ 0,
"sequence0",
new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "0"), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "4")),
@@ -944,7 +944,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
)
),
new KinesisIndexTaskIOConfig(
- null,
+ 0,
"sequence0",
new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "0"), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "4")),
@@ -1015,7 +1015,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
final KinesisIndexTask task = createTask(
null,
new KinesisIndexTaskIOConfig(
- null,
+ 0,
"sequence0",
new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2")),
@@ -1071,7 +1071,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
final KinesisIndexTask task = createTask(
null,
new KinesisIndexTaskIOConfig(
- null,
+ 0,
"sequence0",
new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "4")),
@@ -1140,7 +1140,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
final KinesisIndexTask task = createTask(
null,
new KinesisIndexTaskIOConfig(
- null,
+ 0,
"sequence0",
new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "4")),
@@ -1209,7 +1209,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
final KinesisIndexTask task = createTask(
null,
new KinesisIndexTaskIOConfig(
- null,
+ 0,
"sequence0",
new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "5")),
@@ -1268,7 +1268,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
final KinesisIndexTask task = createTask(
null,
new KinesisIndexTaskIOConfig(
- null,
+ 0,
"sequence0",
new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "12")),
@@ -1366,7 +1366,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
final KinesisIndexTask task = createTask(
null,
new KinesisIndexTaskIOConfig(
- null,
+ 0,
"sequence0",
new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "9")),
@@ -1448,7 +1448,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
final KinesisIndexTask task1 = createTask(
null,
new KinesisIndexTaskIOConfig(
- null,
+ 0,
"sequence0",
new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "4")),
@@ -1466,7 +1466,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
final KinesisIndexTask task2 = createTask(
null,
new KinesisIndexTaskIOConfig(
- null,
+ 0,
"sequence0",
new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "4")),
@@ -1540,7 +1540,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
final KinesisIndexTask task1 = createTask(
null,
new KinesisIndexTaskIOConfig(
- null,
+ 0,
"sequence0",
new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "4")),
@@ -1558,7 +1558,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
final KinesisIndexTask task2 = createTask(
null,
new KinesisIndexTaskIOConfig(
- null,
+ 1,
"sequence1",
new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "3"), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "9")),
@@ -1630,7 +1630,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
final KinesisIndexTask task1 = createTask(
null,
new KinesisIndexTaskIOConfig(
- null,
+ 0,
"sequence0",
new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "4")),
@@ -1648,7 +1648,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
final KinesisIndexTask task2 = createTask(
null,
new KinesisIndexTaskIOConfig(
- null,
+ 1,
"sequence1",
new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "3"), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "9")),
@@ -1724,7 +1724,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
final KinesisIndexTask task = createTask(
null,
new KinesisIndexTaskIOConfig(
- null,
+ 0,
"sequence1",
new SeekableStreamStartSequenceNumbers<>(
stream,
@@ -1808,7 +1808,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
final KinesisIndexTask task1 = createTask(
null,
new KinesisIndexTaskIOConfig(
- null,
+ 0,
"sequence0",
new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "4")),
@@ -1826,7 +1826,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
final KinesisIndexTask task2 = createTask(
null,
new KinesisIndexTaskIOConfig(
- null,
+ 1,
"sequence1",
new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId0, "0"), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId0, "1")),
@@ -1901,7 +1901,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
final KinesisIndexTask task1 = createTask(
"task1",
new KinesisIndexTaskIOConfig(
- null,
+ 0,
"sequence0",
new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "5")),
@@ -1950,7 +1950,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
final KinesisIndexTask task2 = createTask(
task1.getId(),
new KinesisIndexTaskIOConfig(
- null,
+ 0,
"sequence0",
new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "5")),
@@ -2034,7 +2034,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
final KinesisIndexTask task1 = createTask(
"task1",
new KinesisIndexTaskIOConfig(
- null,
+ 0,
"sequence0",
new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "0"), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "6")),
@@ -2095,7 +2095,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
final KinesisIndexTask task2 = createTask(
task1.getId(),
new KinesisIndexTaskIOConfig(
- null,
+ 0,
"sequence0",
new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "0"), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "6")),
@@ -2160,7 +2160,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
final KinesisIndexTask task = createTask(
"task1",
new KinesisIndexTaskIOConfig(
- null,
+ 0,
"sequence0",
new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "13")),
@@ -2283,7 +2283,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
"task1",
DATA_SCHEMA,
new KinesisIndexTaskIOConfig(
- null,
+ 0,
"sequence0",
new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "0"), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "4")),
@@ -2380,7 +2380,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
ImmutableMap.of(shardId1, "100") // simulating unlimited
);
final KinesisIndexTaskIOConfig ioConfig = new KinesisIndexTaskIOConfig(
- null,
+ 0,
baseSequenceName,
startPartitions,
endPartitions,
@@ -2493,7 +2493,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
"task1",
DATA_SCHEMA,
new KinesisIndexTaskIOConfig(
- null,
+ 0,
"sequence0",
new SeekableStreamStartSequenceNumbers<>(
stream,
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
index 73c1d46..4b138f6 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
@@ -3550,7 +3550,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
getDataSchema(dataSource),
tuningConfig,
new KinesisIndexTaskIOConfig(
- null,
+ 0,
"sequenceName-" + taskGroupId,
startPartitions,
endPartitions,
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org