You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by fj...@apache.org on 2019/03/13 04:12:30 UTC
[incubator-druid] branch master updated: Fix record validation in
SeekableStreamIndexTaskRunner (#7246)
This is an automated email from the ASF dual-hosted git repository.
fjy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new 32e86ea Fix record validation in SeekableStreamIndexTaskRunner (#7246)
32e86ea is described below
commit 32e86ea75ee27b4889d74b24847294d9b868cb6f
Author: Jihoon Son <ji...@apache.org>
AuthorDate: Tue Mar 12 21:12:21 2019 -0700
Fix record validation in SeekableStreamIndexTaskRunner (#7246)
* Fix record validation in SeekableStreamIndexTaskRunner
* add kinesis test
---
.../indexing/kafka/LegacyKafkaIndexTaskRunner.java | 2 +-
.../druid/indexing/kafka/KafkaIndexTaskTest.java | 42 ++++--
.../indexing/kinesis/KinesisIndexTaskTest.java | 152 ++++++++++++++++++++-
.../seekablestream/SeekableStreamIndexTask.java | 2 +-
.../SeekableStreamIndexTaskRunner.java | 66 ++++-----
.../common/OrderedPartitionableRecord.java | 2 +-
.../seekablestream/common/RecordSupplier.java | 2 +-
.../supervisor/SeekableStreamSupervisor.java | 2 +-
8 files changed, 220 insertions(+), 50 deletions(-)
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java
index c82700d..c822088 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java
@@ -959,7 +959,7 @@ public class LegacyKafkaIndexTaskRunner extends SeekableStreamIndexTaskRunner<In
}
@Override
- public Map<Integer, Long> getCurrentOffsets()
+ public ConcurrentMap<Integer, Long> getCurrentOffsets()
{
return nextOffsets;
}
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index b19bf8a..ef77185 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -226,8 +226,6 @@ public class KafkaIndexTaskTest
private File reportsFile;
private RowIngestionMetersFactory rowIngestionMetersFactory;
- private int handoffCount = 0;
-
// This should be removed in versions greater that 0.12.x
// isIncrementalHandoffSupported should always be set to true in those later versions
@Parameterized.Parameters(name = "isIncrementalHandoffSupported = {0}")
@@ -877,7 +875,14 @@ public class KafkaIndexTaskTest
new ProducerRecord<>(topic, 0, null, jb("2011", "d", "y", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, jb("2011", "D", "y", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, jb("2012", "e", "y", "10", "20.0", "1.0")),
- new ProducerRecord<>(topic, 0, null, jb("2009", "B", "y", "10", "20.0", "1.0"))
+ new ProducerRecord<>(topic, 0, null, jb("2009", "B", "y", "10", "20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null, jb("2008", "A", "x", "10", "20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null, jb("2009", "B", "x", "10", "20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null, jb("2010", "C", "x", "10", "20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null, jb("2011", "D", "x", "10", "20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null, jb("2011", "d", "x", "10", "20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null, jb("2012", "E", "x", "10", "20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null, jb("2009", "b", "x", "10", "20.0", "1.0"))
);
final String baseSequenceName = "sequence0";
@@ -904,9 +909,13 @@ public class KafkaIndexTaskTest
topic,
ImmutableMap.of(0, 5L)
);
+ final SeekableStreamPartitions<Integer, Long> checkpoint2 = new SeekableStreamPartitions<>(
+ topic,
+ ImmutableMap.of(0, 12L)
+ );
final SeekableStreamPartitions<Integer, Long> endPartitions = new SeekableStreamPartitions<>(
topic,
- ImmutableMap.of(0, 7L)
+ ImmutableMap.of(0, Long.MAX_VALUE)
);
final KafkaIndexTask task = createTask(
@@ -927,17 +936,28 @@ public class KafkaIndexTaskTest
while (task.getRunner().getStatus() != Status.PAUSED) {
Thread.sleep(10);
}
- final Map<Integer, Long> currentOffsets = ImmutableMap.copyOf(task.getRunner().getCurrentOffsets());
- Assert.assertTrue(checkpoint1.getPartitionSequenceNumberMap().equals(currentOffsets));
+ Map<Integer, Long> currentOffsets = ImmutableMap.copyOf(task.getRunner().getCurrentOffsets());
+ Assert.assertEquals(checkpoint1.getPartitionSequenceNumberMap(), currentOffsets);
+
+ // Simulating the case when another replica has consumed up to the offset of 8
+ task.getRunner().setEndOffsets(ImmutableMap.of(0, 8L), false);
- // actual checkpoint offset is 5, but simulating behavior of publishing set end offset call, to ensure this task
- // will continue reading through the end offset of the checkpointed sequence
- task.getRunner().setEndOffsets(ImmutableMap.of(0, 6L), true);
+ // The task is supposed to consume remaining rows up to the offset of 13
+ while (task.getRunner().getStatus() != Status.PAUSED) {
+ Thread.sleep(10);
+ }
+ currentOffsets = ImmutableMap.copyOf(task.getRunner().getCurrentOffsets());
+ Assert.assertEquals(checkpoint2.getPartitionSequenceNumberMap(), currentOffsets);
+
+ task.getRunner().setEndOffsets(
+ ImmutableMap.of(0, task.getRunner().getCurrentOffsets().get(0) + 1L),
+ true
+ );
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
- // processed count would be 5 if it stopped at it's current offsets
- Assert.assertEquals(6, task.getRunner().getRowIngestionMeters().getProcessed());
+ // processed count would be 8 if it stopped at it's current offsets
+ Assert.assertEquals(13, task.getRunner().getRowIngestionMeters().getProcessed());
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getUnparseable());
Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway());
}
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
index 78ce481..98a3bee 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
@@ -176,6 +176,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@@ -639,7 +640,6 @@ public class KinesisIndexTaskTest extends EasyMockSupport
Assert.assertEquals(ImmutableList.of("f"), readSegmentColumn("dim1", desc7));
}
-
@Test(timeout = 120_000L)
public void testIncrementalHandOffMaxTotalRows() throws Exception
{
@@ -2277,7 +2277,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
verifyAll();
- Map<String, String> currentOffsets = task.getRunner().getCurrentOffsets();
+ ConcurrentMap<String, String> currentOffsets = task.getRunner().getCurrentOffsets();
try {
future.get(10, TimeUnit.SECONDS);
@@ -2423,6 +2423,154 @@ public class KinesisIndexTaskTest extends EasyMockSupport
Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentColumn("dim1", desc2));
}
+ @Test(timeout = 5000L)
+ public void testIncrementalHandOffReadsThroughEndOffsets() throws Exception
+ {
+ final List<OrderedPartitionableRecord<String, String>> records = ImmutableList.of(
+ new OrderedPartitionableRecord<>(stream, "1", "0", jb("2008", "a", "y", "10", "20.0", "1.0")),
+ new OrderedPartitionableRecord<>(stream, "1", "1", jb("2009", "b", "y", "10", "20.0", "1.0")),
+ new OrderedPartitionableRecord<>(stream, "1", "2", jb("2010", "c", "y", "10", "20.0", "1.0")),
+ new OrderedPartitionableRecord<>(stream, "1", "3", jb("2011", "d", "y", "10", "20.0", "1.0")),
+ new OrderedPartitionableRecord<>(stream, "1", "4", jb("2011", "e", "y", "10", "20.0", "1.0")),
+ new OrderedPartitionableRecord<>(stream, "1", "5", jb("2012", "a", "y", "10", "20.0", "1.0")),
+ new OrderedPartitionableRecord<>(stream, "1", "6", jb("2013", "b", "y", "10", "20.0", "1.0")),
+ new OrderedPartitionableRecord<>(stream, "1", "7", jb("2010", "c", "y", "10", "20.0", "1.0")),
+ new OrderedPartitionableRecord<>(stream, "1", "8", jb("2011", "d", "y", "10", "20.0", "1.0")),
+ new OrderedPartitionableRecord<>(stream, "1", "9", jb("2011", "e", "y", "10", "20.0", "1.0")),
+ new OrderedPartitionableRecord<>(stream, "1", "10", jb("2008", "a", "y", "10", "20.0", "1.0")),
+ new OrderedPartitionableRecord<>(stream, "1", "11", jb("2009", "b", "y", "10", "20.0", "1.0")),
+ new OrderedPartitionableRecord<>(stream, "1", "12", jb("2010", "c", "y", "10", "20.0", "1.0")),
+ new OrderedPartitionableRecord<>(stream, "1", "13", jb("2012", "d", "y", "10", "20.0", "1.0")),
+ new OrderedPartitionableRecord<>(stream, "1", "14", jb("2013", "e", "y", "10", "20.0", "1.0"))
+ );
+
+ final String baseSequenceName = "sequence0";
+ // as soon as any segment has more than one record, incremental publishing should happen
+ maxRowsPerSegment = 2;
+ maxRecordsPerPoll = 1;
+
+ recordSupplier.assign(anyObject());
+ expectLastCall().anyTimes();
+
+ expect(recordSupplier.getEarliestSequenceNumber(anyObject())).andReturn("0").anyTimes();
+
+ recordSupplier.seek(anyObject(), anyString());
+ expectLastCall().anyTimes();
+
+ expect(recordSupplier.poll(anyLong())).andReturn(records.subList(0, 5))
+ .once()
+ .andReturn(records.subList(4, 10))
+ .once()
+ .andReturn(records.subList(9, 15))
+ .once();
+
+ recordSupplier.close();
+ expectLastCall().once();
+
+ replayAll();
+
+ final SeekableStreamPartitions<String, String> startPartitions = new SeekableStreamPartitions<>(
+ stream,
+ ImmutableMap.of(
+ shardId1,
+ "0"
+ )
+ );
+
+ final SeekableStreamPartitions<String, String> checkpoint1 = new SeekableStreamPartitions<>(
+ stream,
+ ImmutableMap.of(
+ shardId1,
+ "4"
+ )
+ );
+
+ final SeekableStreamPartitions<String, String> checkpoint2 = new SeekableStreamPartitions<>(
+ stream,
+ ImmutableMap.of(
+ shardId1,
+ "9"
+ )
+ );
+
+ final SeekableStreamPartitions<String, String> endPartitions = new SeekableStreamPartitions<>(
+ stream,
+ ImmutableMap.of(
+ shardId1,
+ "14"
+ )
+ );
+ final KinesisIndexTask task = createTask(
+ null,
+ new KinesisIndexTaskIOConfig(
+ null,
+ baseSequenceName,
+ startPartitions,
+ endPartitions,
+ true,
+ null,
+ null,
+ "awsEndpoint",
+ null,
+ null,
+ null,
+ null,
+ null,
+ false
+ )
+ );
+ final ListenableFuture<TaskStatus> future = runTask(task);
+ while (task.getRunner().getStatus() != SeekableStreamIndexTaskRunner.Status.PAUSED) {
+ Thread.sleep(10);
+ }
+ Map<String, String> currentOffsets = ImmutableMap.copyOf(task.getRunner().getCurrentOffsets());
+ Assert.assertEquals(checkpoint1.getPartitionSequenceNumberMap(), currentOffsets);
+ task.getRunner().setEndOffsets(currentOffsets, false);
+
+ // The task is supposed to consume remaining rows up to the offset of 13
+ while (task.getRunner().getStatus() != Status.PAUSED) {
+ Thread.sleep(10);
+ }
+ currentOffsets = ImmutableMap.copyOf(task.getRunner().getCurrentOffsets());
+ Assert.assertEquals(checkpoint2.getPartitionSequenceNumberMap(), currentOffsets);
+
+ task.getRunner().setEndOffsets(
+ ImmutableMap.of(shardId1, String.valueOf(Long.valueOf(task.getRunner().getCurrentOffsets().get(shardId1)) + 1)),
+ true
+ );
+
+ Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
+
+ verifyAll();
+
+ Assert.assertEquals(2, checkpointRequestsHash.size());
+
+ // Check metrics
+ Assert.assertEquals(12, task.getRunner().getRowIngestionMeters().getProcessed());
+ Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getUnparseable());
+ Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway());
+
+ // Check published metadata
+ final Set<SegmentDescriptor> descriptors = new HashSet<>();
+ descriptors.add(sd(task, "2008/P1D", 0));
+ descriptors.add(sd(task, "2008/P1D", 1));
+ descriptors.add(sd(task, "2009/P1D", 0));
+ descriptors.add(sd(task, "2010/P1D", 0));
+ descriptors.add(sd(task, "2010/P1D", 1));
+ descriptors.add(sd(task, "2011/P1D", 0));
+ descriptors.add(sd(task, "2011/P1D", 1));
+ descriptors.add(sd(task, "2012/P1D", 0));
+ descriptors.add(sd(task, "2013/P1D", 0));
+ Assert.assertEquals(descriptors, publishedDescriptors());
+ Assert.assertEquals(
+ new KinesisDataSourceMetadata(new SeekableStreamPartitions<>(stream, ImmutableMap.of(
+ shardId1,
+ "10"
+ ))),
+ metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource())
+ );
+ }
+
private ListenableFuture<TaskStatus> runTask(final Task task)
{
try {
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
index d4659bf..833cc17 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
@@ -61,7 +61,7 @@ import java.nio.ByteBuffer;
import java.util.Map;
-public abstract class SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType extends Comparable>
+public abstract class SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType>
extends AbstractTask implements ChatHandler
{
public static final long LOCK_ACQUIRE_TIMEOUT_SECONDS = 15;
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index b2b906e..7eee9dc 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -130,7 +130,7 @@ import java.util.stream.Collectors;
* @param <PartitionIdType> Partition Number Type
* @param <SequenceOffsetType> Sequence Number Type
*/
-public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOffsetType extends Comparable> implements ChatHandler
+public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOffsetType> implements ChatHandler
{
public enum Status
{
@@ -196,7 +196,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
private final Set<String> publishingSequences = Sets.newConcurrentHashSet();
private final List<ListenableFuture<SegmentsAndMetadata>> publishWaitList = new ArrayList<>();
private final List<ListenableFuture<SegmentsAndMetadata>> handOffWaitList = new ArrayList<>();
- private final Map<PartitionIdType, SequenceOffsetType> initialOffsetsSnapshot = new HashMap<>();
+ private final Set<PartitionIdType> initialOffsetsSnapshot = new HashSet<>();
private final Set<PartitionIdType> exclusiveStartingPartitions = new HashSet<>();
private volatile DateTime startTime;
@@ -454,7 +454,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
status = Status.READING;
Throwable caughtExceptionInner = null;
- initialOffsetsSnapshot.putAll(currOffsets);
+ initialOffsetsSnapshot.addAll(currOffsets.keySet());
exclusiveStartingPartitions.addAll(ioConfig.getExclusiveStartSequenceNumberPartitions());
try {
@@ -490,7 +490,6 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
maybePersistAndPublishSequences(committerSupplier);
-
// calling getRecord() ensures that exceptions specific to kafka/kinesis like OffsetOutOfRangeException
// are handled in the subclasses.
List<OrderedPartitionableRecord<PartitionIdType, SequenceOffsetType>> records = getRecords(
@@ -512,9 +511,9 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
continue;
}
- // for the first message we receive, check that we were given a message with a sequenceNumber that matches our
- // expected starting sequenceNumber
- if (!verifyInitialRecordAndSkipExclusivePartition(record, initialOffsetsSnapshot)) {
+ // for the first message we receive, check that we were given a message with a sequenceNumber that matches
+ // our expected starting sequenceNumber
+ if (!verifyInitialRecordAndSkipExclusivePartition(record)) {
continue;
}
@@ -1281,7 +1280,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
return getCurrentOffsets();
}
- public Map<PartitionIdType, SequenceOffsetType> getCurrentOffsets()
+ public ConcurrentMap<PartitionIdType, SequenceOffsetType> getCurrentOffsets()
{
return currOffsets;
}
@@ -1384,14 +1383,15 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
// do not mark the starting sequence number as exclusive
Set<PartitionIdType> exclusivePartitions = sequenceNumbers.keySet()
.stream()
- .filter(x -> !initialOffsetsSnapshot.containsKey(x)
+ .filter(x -> !initialOffsetsSnapshot.contains(x)
|| ioConfig.getExclusiveStartSequenceNumberPartitions()
.contains(x))
.collect(Collectors.toSet());
- if ((latestSequence.getStartOffsets().equals(sequenceNumbers) && latestSequence.exclusiveStartPartitions.equals(
- exclusivePartitions) && !finish) ||
- (latestSequence.getEndOffsets().equals(sequenceNumbers) && finish)) {
+ if ((latestSequence.getStartOffsets().equals(sequenceNumbers)
+ && latestSequence.exclusiveStartPartitions.equals(exclusivePartitions)
+ && !finish)
+ || (latestSequence.getEndOffsets().equals(sequenceNumbers) && finish)) {
log.warn("Ignoring duplicate request, end sequences already set for sequences [%s]", sequenceNumbers);
resume();
return Response.ok(sequenceNumbers).build();
@@ -1442,7 +1442,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
exclusivePartitions
);
sequences.add(newSequence);
- initialOffsetsSnapshot.putAll(sequenceNumbers);
+ initialOffsetsSnapshot.addAll(sequenceNumbers.keySet());
}
persistSequences();
}
@@ -1882,33 +1882,35 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
}
private boolean verifyInitialRecordAndSkipExclusivePartition(
- final OrderedPartitionableRecord<PartitionIdType, SequenceOffsetType> record,
- final Map<PartitionIdType, SequenceOffsetType> intialSequenceSnapshot
+ final OrderedPartitionableRecord<PartitionIdType, SequenceOffsetType> record
)
{
- if (intialSequenceSnapshot.containsKey(record.getPartitionId())) {
- if (record.getSequenceNumber().compareTo(intialSequenceSnapshot.get(record.getPartitionId())) < 0) {
- throw new ISE(
- "Starting sequenceNumber [%s] does not match expected [%s] for partition [%s]",
- record.getSequenceNumber(),
- intialSequenceSnapshot.get(record.getPartitionId()),
- record.getPartitionId()
+ // Check only for the first record among the record batch.
+ if (initialOffsetsSnapshot.contains(record.getPartitionId())) {
+ final SequenceOffsetType currOffset = currOffsets.get(record.getPartitionId());
+ if (currOffset != null) {
+ final OrderedSequenceNumber<SequenceOffsetType> recordSequenceNumber = createSequenceNumber(
+ record.getSequenceNumber()
+ );
+ final OrderedSequenceNumber<SequenceOffsetType> currentSequenceNumber = createSequenceNumber(
+ currOffset
);
+ if (recordSequenceNumber.compareTo(currentSequenceNumber) < 0) {
+ throw new ISE(
+ "sequenceNumber of the start record[%s] is smaller than current sequenceNumber[%s] for partition [%s]",
+ record.getSequenceNumber(),
+ currOffset,
+ record.getPartitionId()
+ );
+ }
}
- log.info(
- "Verified starting sequenceNumber [%s] for partition [%s]",
- record.getSequenceNumber(), record.getPartitionId()
- );
-
- intialSequenceSnapshot.remove(record.getPartitionId());
- if (intialSequenceSnapshot.isEmpty()) {
- log.info("Verified starting sequences for all partitions");
- }
+ // Remove the mark to notify that this partition has been read.
+ initialOffsetsSnapshot.remove(record.getPartitionId());
// check exclusive starting sequence
if (isStartingSequenceOffsetsExclusive() && exclusiveStartingPartitions.contains(record.getPartitionId())) {
- log.info("Skipping starting sequenceNumber for partition [%s] marked exclusive", record.getPartitionId());
+ log.warn("Skipping starting sequenceNumber for partition [%s] marked exclusive", record.getPartitionId());
return false;
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/OrderedPartitionableRecord.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/OrderedPartitionableRecord.java
index a122d9e..4dd653e 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/OrderedPartitionableRecord.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/OrderedPartitionableRecord.java
@@ -35,7 +35,7 @@ import java.util.stream.Collectors;
* @param <PartitionIdType> partition id
* @param <SequenceOffsetType> sequence number
*/
-public class OrderedPartitionableRecord<PartitionIdType, SequenceOffsetType extends Comparable>
+public class OrderedPartitionableRecord<PartitionIdType, SequenceOffsetType>
{
private final String stream;
private final PartitionIdType partitionId;
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java
index 3a6e87e..d9e599d 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java
@@ -36,7 +36,7 @@ import java.util.Set;
* @param <SequenceOffsetType> Sequence Number Type
*/
@Beta
-public interface RecordSupplier<PartitionIdType, SequenceOffsetType extends Comparable> extends Closeable
+public interface RecordSupplier<PartitionIdType, SequenceOffsetType> extends Closeable
{
/**
* assigns the given partitions to this RecordSupplier
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 4c6509d..25250ac 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -119,7 +119,7 @@ import java.util.stream.Stream;
* @param <PartitionIdType> the type of the partition id, for example, partitions in Kafka are int type while partitions in Kinesis are String type
* @param <SequenceOffsetType> the type of the sequence number or offsets, for example, Kafka uses long offsets while Kinesis uses String sequence numbers
*/
-public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType extends Comparable>
+public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType>
implements Supervisor
{
public static final String IS_INCREMENTAL_HANDOFF_SUPPORTED = "IS_INCREMENTAL_HANDOFF_SUPPORTED";
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org