You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by jo...@apache.org on 2019/03/27 00:16:35 UTC
[incubator-druid] branch 0.14.0-incubating updated: Fix exclusive
start partitions for sequenceMetadata (#7339) (#7347)
This is an automated email from the ASF dual-hosted git repository.
jonwei pushed a commit to branch 0.14.0-incubating
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/0.14.0-incubating by this push:
new 1a29d4e Fix exclusive start partitions for sequenceMetadata (#7339) (#7347)
1a29d4e is described below
commit 1a29d4e88349a002ec8356e64281a661d47271f1
Author: Jonathan Wei <jo...@users.noreply.github.com>
AuthorDate: Tue Mar 26 17:16:27 2019 -0700
Fix exclusive start partitions for sequenceMetadata (#7339) (#7347)
* Fix exclusvie start partitions for sequenceMetadata
* add empty check
---
.../indexing/kafka/supervisor/KafkaSupervisor.java | 25 ++--
.../druid/indexing/kafka/KafkaIndexTaskTest.java | 76 ++++--------
.../kinesis/supervisor/KinesisSupervisor.java | 25 ++--
.../indexing/kinesis/KinesisIndexTaskTest.java | 117 +++++++++++++++---
.../actions/SegmentTransactionalInsertAction.java | 5 +-
.../SeekableStreamIndexTaskRunner.java | 136 +++++++++++++++------
.../indexing/seekablestream/SequenceMetadata.java | 7 +-
.../supervisor/SeekableStreamSupervisor.java | 25 +++-
.../supervisor/SeekableStreamSupervisorSpec.java | 5 +-
.../IndexerSQLMetadataStorageCoordinator.java | 7 +-
10 files changed, 272 insertions(+), 156 deletions(-)
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index 86639fe..bbc1639 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -24,7 +24,6 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
-import com.google.common.collect.ImmutableMap;
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TaskResource;
@@ -80,6 +79,11 @@ import java.util.stream.Collectors;
*/
public class KafkaSupervisor extends SeekableStreamSupervisor<Integer, Long>
{
+ public static final TypeReference<TreeMap<Integer, Map<Integer, Long>>> CHECKPOINTS_TYPE_REF =
+ new TypeReference<TreeMap<Integer, Map<Integer, Long>>>()
+ {
+ };
+
private static final EmittingLogger log = new EmittingLogger(KafkaSupervisor.class);
private static final long MINIMUM_GET_OFFSET_PERIOD_MILLIS = 5000;
private static final long INITIAL_GET_OFFSET_DELAY_MILLIS = 15000;
@@ -230,20 +234,9 @@ public class KafkaSupervisor extends SeekableStreamSupervisor<Integer, Long>
RowIngestionMetersFactory rowIngestionMetersFactory
) throws JsonProcessingException
{
- final String checkpoints = sortingMapper.writerWithType(new TypeReference<TreeMap<Integer, Map<Integer, Long>>>()
- {
- }).writeValueAsString(sequenceOffsets);
- final Map<String, Object> context = spec.getContext() == null
- ? ImmutableMap.of(
- "checkpoints",
- checkpoints,
- IS_INCREMENTAL_HANDOFF_SUPPORTED,
- true
- ) : ImmutableMap.<String, Object>builder()
- .put("checkpoints", checkpoints)
- .put(IS_INCREMENTAL_HANDOFF_SUPPORTED, true)
- .putAll(spec.getContext())
- .build();
+ final String checkpoints = sortingMapper.writerFor(CHECKPOINTS_TYPE_REF).writeValueAsString(sequenceOffsets);
+ final Map<String, Object> context = createBaseTaskContexts();
+ context.put(CHECKPOINTS_CTX_KEY, checkpoints);
List<SeekableStreamIndexTask<Integer, Long>> taskList = new ArrayList<>();
for (int i = 0; i < replicas; i++) {
@@ -359,7 +352,7 @@ public class KafkaSupervisor extends SeekableStreamSupervisor<Integer, Long>
}
@Override
- protected boolean useExclusiveStartSequenceNumberForStartSequence()
+ protected boolean useExclusiveStartSequenceNumberForNonFirstSequence()
{
return false;
}
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index 5a3f7dd..f11599f 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -19,6 +19,7 @@
package org.apache.druid.indexing.kafka;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -68,6 +69,7 @@ import org.apache.druid.indexing.common.stats.RowIngestionMeters;
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.IndexTaskTest;
import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisor;
import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig;
import org.apache.druid.indexing.kafka.test.TestBroker;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
@@ -2217,9 +2219,10 @@ public class KafkaIndexTaskTest
// and this task should start reading from offset 2 for partition 0
sequences.put(1, ImmutableMap.of(0, 2L));
final Map<String, Object> context = new HashMap<>();
- context.put("checkpoints", objectMapper.writerWithType(new TypeReference<TreeMap<Integer, Map<Integer, Long>>>()
- {
- }).writeValueAsString(sequences));
+ context.put(
+ SeekableStreamSupervisor.CHECKPOINTS_CTX_KEY,
+ objectMapper.writerFor(KafkaSupervisor.CHECKPOINTS_TYPE_REF).writeValueAsString(sequences)
+ );
final KafkaIndexTask task = createTask(
null,
@@ -2404,7 +2407,7 @@ public class KafkaIndexTaskTest
private KafkaIndexTask createTask(
final String taskId,
final KafkaIndexTaskIOConfig ioConfig
- )
+ ) throws JsonProcessingException
{
return createTask(taskId, DATA_SCHEMA, ioConfig);
}
@@ -2413,7 +2416,7 @@ public class KafkaIndexTaskTest
final String taskId,
final KafkaIndexTaskIOConfig ioConfig,
final Map<String, Object> context
- )
+ ) throws JsonProcessingException
{
return createTask(taskId, DATA_SCHEMA, ioConfig, context);
}
@@ -2422,62 +2425,24 @@ public class KafkaIndexTaskTest
final String taskId,
final DataSchema dataSchema,
final KafkaIndexTaskIOConfig ioConfig
- )
+ ) throws JsonProcessingException
{
- final KafkaIndexTaskTuningConfig tuningConfig = new KafkaIndexTaskTuningConfig(
- 1000,
- null,
- maxRowsPerSegment,
- maxTotalRows,
- new Period("P1Y"),
- null,
- null,
- null,
- true,
- reportParseExceptions,
- handoffConditionTimeout,
- resetOffsetAutomatically,
- null,
- intermediateHandoffPeriod,
- logParseExceptions,
- maxParseExceptions,
- maxSavedParseExceptions
- );
- final Map<String, Object> context = isIncrementalHandoffSupported
- ? ImmutableMap.of(
- SeekableStreamSupervisor.IS_INCREMENTAL_HANDOFF_SUPPORTED,
- true
- )
- : null;
- final KafkaIndexTask task = new KafkaIndexTask(
- taskId,
- null,
- cloneDataSchema(dataSchema),
- tuningConfig,
- ioConfig,
- context,
- null,
- null,
- rowIngestionMetersFactory,
- objectMapper
- );
- task.setPollRetryMs(POLL_RETRY_MS);
- return task;
+ final Map<String, Object> context = new HashMap<>();
+ return createTask(taskId, dataSchema, ioConfig, context);
}
-
private KafkaIndexTask createTask(
final String taskId,
final DataSchema dataSchema,
final KafkaIndexTaskIOConfig ioConfig,
final Map<String, Object> context
- )
+ ) throws JsonProcessingException
{
final KafkaIndexTaskTuningConfig tuningConfig = new KafkaIndexTaskTuningConfig(
1000,
null,
maxRowsPerSegment,
- null,
+ maxTotalRows,
new Period("P1Y"),
null,
null,
@@ -2487,13 +2452,22 @@ public class KafkaIndexTaskTest
handoffConditionTimeout,
resetOffsetAutomatically,
null,
- null,
+ intermediateHandoffPeriod,
logParseExceptions,
maxParseExceptions,
maxSavedParseExceptions
);
if (isIncrementalHandoffSupported) {
context.put(SeekableStreamSupervisor.IS_INCREMENTAL_HANDOFF_SUPPORTED, true);
+
+ if (!context.containsKey(SeekableStreamSupervisor.CHECKPOINTS_CTX_KEY)) {
+ final TreeMap<Integer, Map<Integer, Long>> checkpoints = new TreeMap<>();
+ checkpoints.put(0, ioConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap());
+ final String checkpointsJson = objectMapper
+ .writerFor(KafkaSupervisor.CHECKPOINTS_TYPE_REF)
+ .writeValueAsString(checkpoints);
+ context.put(SeekableStreamSupervisor.CHECKPOINTS_CTX_KEY, checkpointsJson);
+ }
}
final KafkaIndexTask task = new KafkaIndexTask(
@@ -2562,8 +2536,8 @@ public class KafkaIndexTaskTest
objectMapper.registerModule(module);
}
final TaskConfig taskConfig = new TaskConfig(
- new File(directory, "taskBaseDir").getPath(),
- null,
+ new File(directory, "baseDir").getPath(),
+ new File(directory, "baseTaskDir").getPath(),
null,
50000,
null,
diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
index 332e693..870d7ec 100644
--- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
+++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
@@ -75,6 +75,11 @@ import java.util.concurrent.ScheduledExecutorService;
*/
public class KinesisSupervisor extends SeekableStreamSupervisor<String, String>
{
+ public static final TypeReference<TreeMap<Integer, Map<String, String>>> CHECKPOINTS_TYPE_REF =
+ new TypeReference<TreeMap<Integer, Map<String, String>>>()
+ {
+ };
+
private static final String NOT_SET = "-1";
private final KinesisSupervisorSpec spec;
private final AWSCredentialsConfig awsCredentialsConfig;
@@ -151,20 +156,10 @@ public class KinesisSupervisor extends SeekableStreamSupervisor<String, String>
RowIngestionMetersFactory rowIngestionMetersFactory
) throws JsonProcessingException
{
- final String checkpoints = sortingMapper.writerFor(new TypeReference<TreeMap<Integer, Map<String, String>>>()
- {
- }).writeValueAsString(sequenceOffsets);
- final Map<String, Object> context = spec.getContext() == null
- ? ImmutableMap.of(
- "checkpoints",
- checkpoints,
- IS_INCREMENTAL_HANDOFF_SUPPORTED,
- true
- ) : ImmutableMap.<String, Object>builder()
- .put("checkpoints", checkpoints)
- .put(IS_INCREMENTAL_HANDOFF_SUPPORTED, true)
- .putAll(spec.getContext())
- .build();
+ final String checkpoints = sortingMapper.writerFor(CHECKPOINTS_TYPE_REF).writeValueAsString(sequenceOffsets);
+ final Map<String, Object> context = createBaseTaskContexts();
+ context.put(CHECKPOINTS_CTX_KEY, checkpoints);
+
List<SeekableStreamIndexTask<String, String>> taskList = new ArrayList<>();
for (int i = 0; i < replicas; i++) {
String taskId = Joiner.on("_").join(baseSequenceName, RandomIdUtils.getRandomId());
@@ -313,7 +308,7 @@ public class KinesisSupervisor extends SeekableStreamSupervisor<String, String>
}
@Override
- protected boolean useExclusiveStartSequenceNumberForStartSequence()
+ protected boolean useExclusiveStartSequenceNumberForNonFirstSequence()
{
return true;
}
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
index 127b3d9..0fbf59a 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.Module;
@@ -74,6 +75,7 @@ import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.IndexTaskTest;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TaskResource;
+import org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisor;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.MetadataTaskStorage;
@@ -83,6 +85,7 @@ import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
+import org.apache.druid.indexing.seekablestream.SequenceMetadata;
import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
@@ -181,6 +184,7 @@ import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@@ -512,7 +516,6 @@ public class KinesisIndexTaskTest extends EasyMockSupport
Assert.assertEquals(ImmutableList.of("g"), readSegmentColumn("dim1", desc2));
}
-
@Test(timeout = 120_000L)
public void testIncrementalHandOff() throws Exception
{
@@ -739,7 +742,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
DATA_SCHEMA.getDataSource(),
0,
new KinesisDataSourceMetadata(
- new SeekableStreamStartSequenceNumbers<>(stream, currentOffsets, ImmutableSet.of())
+ new SeekableStreamStartSequenceNumbers<>(stream, currentOffsets, currentOffsets.keySet())
),
new KinesisDataSourceMetadata(new SeekableStreamEndSequenceNumbers<>(stream, nextOffsets))
)
@@ -1952,7 +1955,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
new KinesisIndexTaskIOConfig(
null,
"sequence0",
- new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of(shardId1)),
+ new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2"), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "5")),
true,
null,
@@ -2050,10 +2053,9 @@ public class KinesisIndexTaskTest extends EasyMockSupport
)
);
- final SeekableStreamStartSequenceNumbers<String, String> checkpoint1 = new SeekableStreamStartSequenceNumbers<>(
+ final SeekableStreamEndSequenceNumbers<String, String> checkpoint1 = new SeekableStreamEndSequenceNumbers<>(
stream,
- ImmutableMap.of(shardId1, "4"),
- ImmutableSet.of()
+ ImmutableMap.of(shardId1, "4")
);
final ListenableFuture<TaskStatus> future1 = runTask(task1);
@@ -2098,7 +2100,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
new KinesisIndexTaskIOConfig(
null,
"sequence0",
- new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "0"), ImmutableSet.of(shardId1)),
+ new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "0"), ImmutableSet.of()),
new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "6")),
true,
null,
@@ -2274,9 +2276,10 @@ public class KinesisIndexTaskTest extends EasyMockSupport
// and this task should start reading from offset 2 for partition 0 (not offset 1, because end is inclusive)
sequences.put(1, ImmutableMap.of(shardId1, "1"));
final Map<String, Object> context = new HashMap<>();
- context.put("checkpoints", objectMapper.writerWithType(new TypeReference<TreeMap<Integer, Map<String, String>>>()
- {
- }).writeValueAsString(sequences));
+ context.put(
+ SeekableStreamSupervisor.CHECKPOINTS_CTX_KEY,
+ objectMapper.writerFor(KinesisSupervisor.CHECKPOINTS_TYPE_REF).writeValueAsString(sequences)
+ );
final KinesisIndexTask task = createTask(
@@ -2476,6 +2479,75 @@ public class KinesisIndexTaskTest extends EasyMockSupport
);
}
+ @Test
+ public void testSequencesFromContext() throws IOException
+ {
+ final TreeMap<Integer, Map<String, String>> checkpoints = new TreeMap<>();
+ // Here the sequence number is 1 meaning that one incremental handoff was done by the failed task
+ // and this task should start reading from offset 2 for partition 0 (not offset 1, because end is inclusive)
+ checkpoints.put(0, ImmutableMap.of(shardId0, "0", shardId1, "0"));
+ checkpoints.put(1, ImmutableMap.of(shardId0, "0", shardId1, "1"));
+ checkpoints.put(2, ImmutableMap.of(shardId0, "1", shardId1, "3"));
+ final Map<String, Object> context = new HashMap<>();
+ context.put("checkpoints", objectMapper.writerFor(KinesisSupervisor.CHECKPOINTS_TYPE_REF)
+ .writeValueAsString(checkpoints));
+
+ final KinesisIndexTask task = createTask(
+ "task1",
+ DATA_SCHEMA,
+ new KinesisIndexTaskIOConfig(
+ null,
+ "sequence0",
+ new SeekableStreamStartSequenceNumbers<>(
+ stream,
+ ImmutableMap.of(shardId0, "0", shardId1, "0"),
+ ImmutableSet.of(shardId0)
+ ),
+ new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId0, "1", shardId1, "5")),
+ true,
+ null,
+ null,
+ "awsEndpoint",
+ null,
+ null,
+ null,
+ null,
+ false
+ ),
+ context
+ );
+
+ task.getRunner().setToolbox(toolboxFactory.build(task));
+ task.getRunner().initializeSequences();
+ final CopyOnWriteArrayList<SequenceMetadata<String, String>> sequences = task.getRunner().getSequences();
+
+ Assert.assertEquals(3, sequences.size());
+
+ SequenceMetadata<String, String> sequenceMetadata = sequences.get(0);
+ Assert.assertEquals(checkpoints.get(0), sequenceMetadata.getStartOffsets());
+ Assert.assertEquals(checkpoints.get(1), sequenceMetadata.getEndOffsets());
+ Assert.assertEquals(
+ task.getIOConfig().getStartSequenceNumbers().getExclusivePartitions(),
+ sequenceMetadata.getExclusiveStartPartitions()
+ );
+ Assert.assertTrue(sequenceMetadata.isCheckpointed());
+
+ sequenceMetadata = sequences.get(1);
+ Assert.assertEquals(checkpoints.get(1), sequenceMetadata.getStartOffsets());
+ Assert.assertEquals(checkpoints.get(2), sequenceMetadata.getEndOffsets());
+ Assert.assertEquals(checkpoints.get(1).keySet(), sequenceMetadata.getExclusiveStartPartitions());
+ Assert.assertTrue(sequenceMetadata.isCheckpointed());
+
+ sequenceMetadata = sequences.get(2);
+ Assert.assertEquals(checkpoints.get(2), sequenceMetadata.getStartOffsets());
+ Assert.assertEquals(
+ task.getIOConfig().getEndSequenceNumbers().getPartitionSequenceNumberMap(),
+ sequenceMetadata.getEndOffsets()
+ );
+ Assert.assertEquals(checkpoints.get(2).keySet(), sequenceMetadata.getExclusiveStartPartitions());
+ Assert.assertFalse(sequenceMetadata.isCheckpointed());
+ }
+
private ListenableFuture<TaskStatus> runTask(final Task task)
{
try {
@@ -2525,7 +2597,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
private KinesisIndexTask createTask(
final String taskId,
final KinesisIndexTaskIOConfig ioConfig
- )
+ ) throws JsonProcessingException
{
return createTask(taskId, DATA_SCHEMA, ioConfig, null);
}
@@ -2534,7 +2606,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
final String taskId,
final DataSchema dataSchema,
final KinesisIndexTaskIOConfig ioConfig
- )
+ ) throws JsonProcessingException
{
return createTask(taskId, dataSchema, ioConfig, null);
}
@@ -2544,7 +2616,7 @@ public class KinesisIndexTaskTest extends EasyMockSupport
final DataSchema dataSchema,
final KinesisIndexTaskIOConfig ioConfig,
@Nullable final Map<String, Object> context
- )
+ ) throws JsonProcessingException
{
final KinesisIndexTaskTuningConfig tuningConfig = new KinesisIndexTaskTuningConfig(
maxRowsInMemory,
@@ -2581,13 +2653,22 @@ public class KinesisIndexTaskTest extends EasyMockSupport
final KinesisIndexTaskIOConfig ioConfig,
final KinesisIndexTaskTuningConfig tuningConfig,
@Nullable final Map<String, Object> context
- )
+ ) throws JsonProcessingException
{
if (context != null) {
context.put(SeekableStreamSupervisor.IS_INCREMENTAL_HANDOFF_SUPPORTED, true);
+
+ if (!context.containsKey(SeekableStreamSupervisor.CHECKPOINTS_CTX_KEY)) {
+ final TreeMap<Integer, Map<String, String>> checkpoints = new TreeMap<>();
+ checkpoints.put(0, ioConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap());
+ final String checkpointsJson = objectMapper
+ .writerFor(KinesisSupervisor.CHECKPOINTS_TYPE_REF)
+ .writeValueAsString(checkpoints);
+ context.put(SeekableStreamSupervisor.CHECKPOINTS_CTX_KEY, checkpointsJson);
+ }
}
- final KinesisIndexTask task = new TestableKinesisIndexTask(
+ return new TestableKinesisIndexTask(
taskId,
null,
cloneDataSchema(dataSchema),
@@ -2599,8 +2680,6 @@ public class KinesisIndexTaskTest extends EasyMockSupport
rowIngestionMetersFactory,
null
);
-
- return task;
}
private static DataSchema cloneDataSchema(final DataSchema dataSchema)
@@ -2660,8 +2739,8 @@ public class KinesisIndexTaskTest extends EasyMockSupport
objectMapper.registerModule(module);
}
final TaskConfig taskConfig = new TaskConfig(
- new File(directory, "taskBaseDir").getPath(),
- null,
+ new File(directory, "baseDir").getPath(),
+ new File(directory, "baseTaskDir").getPath(),
null,
50000,
null,
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java
index 8a3c713..3a0aa76 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java
@@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.CriticalAction;
@@ -154,8 +153,8 @@ public class SegmentTransactionalInsertAction implements TaskAction<SegmentPubli
@Override
public String toString()
{
- return "SegmentInsertAction{" +
- "segments=" + Iterables.transform(segments, DataSegment::getId) +
+ return "SegmentTransactionalInsertAction{" +
+ "segments=" + segments +
", startMetadata=" + startMetadata +
", endMetadata=" + endMetadata +
'}';
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index 3907d54..c07118a 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -59,6 +59,7 @@ import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecor
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
+import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
@@ -239,7 +240,6 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
resetNextCheckpointTime();
}
-
public TaskStatus run(TaskToolbox toolbox)
{
try {
@@ -256,57 +256,90 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
}
}
- private TaskStatus runInternal(TaskToolbox toolbox) throws Exception
+ private Set<PartitionIdType> computeExclusiveStartPartitionsForSequence(
+ Map<PartitionIdType, SequenceOffsetType> sequenceStartOffsets
+ )
+ {
+ if (sequenceStartOffsets.equals(ioConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap())) {
+ return ioConfig.getStartSequenceNumbers().getExclusivePartitions();
+ } else {
+ return isEndOffsetExclusive() ? Collections.emptySet() : sequenceStartOffsets.keySet();
+ }
+ }
+
+ @VisibleForTesting
+ public void setToolbox(TaskToolbox toolbox)
{
- log.info("SeekableStream indexing task starting up!");
- startTime = DateTimes.nowUtc();
- status = Status.STARTING;
this.toolbox = toolbox;
+ }
+ @VisibleForTesting
+ public void initializeSequences() throws IOException
+ {
if (!restoreSequences()) {
final TreeMap<Integer, Map<PartitionIdType, SequenceOffsetType>> checkpoints = getCheckPointsFromContext(
toolbox,
- task.getContextValue("checkpoints")
+ task.getContextValue(SeekableStreamSupervisor.CHECKPOINTS_CTX_KEY)
);
if (checkpoints != null) {
- boolean exclusive = false;
Iterator<Map.Entry<Integer, Map<PartitionIdType, SequenceOffsetType>>> sequenceOffsets = checkpoints.entrySet()
.iterator();
Map.Entry<Integer, Map<PartitionIdType, SequenceOffsetType>> previous = sequenceOffsets.next();
while (sequenceOffsets.hasNext()) {
Map.Entry<Integer, Map<PartitionIdType, SequenceOffsetType>> current = sequenceOffsets.next();
- addSequence(new SequenceMetadata<>(
- previous.getKey(),
- StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), previous.getKey()),
- previous.getValue(),
- current.getValue(),
- true,
- exclusive ? previous.getValue().keySet() : null
- ));
+ final Set<PartitionIdType> exclusiveStartPartitions = computeExclusiveStartPartitionsForSequence(
+ previous.getValue()
+ );
+ addSequence(
+ new SequenceMetadata<>(
+ previous.getKey(),
+ StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), previous.getKey()),
+ previous.getValue(),
+ current.getValue(),
+ true,
+ exclusiveStartPartitions
+ )
+ );
previous = current;
- exclusive = true;
}
- addSequence(new SequenceMetadata<>(
- previous.getKey(),
- StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), previous.getKey()),
- previous.getValue(),
- endOffsets,
- false,
- exclusive ? previous.getValue().keySet() : null
- ));
+ final Set<PartitionIdType> exclusiveStartPartitions = computeExclusiveStartPartitionsForSequence(
+ previous.getValue()
+ );
+ addSequence(
+ new SequenceMetadata<>(
+ previous.getKey(),
+ StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), previous.getKey()),
+ previous.getValue(),
+ endOffsets,
+ false,
+ exclusiveStartPartitions
+ )
+ );
} else {
- addSequence(new SequenceMetadata<>(
- 0,
- StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), 0),
- ioConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap(),
- endOffsets,
- false,
- null
- ));
+ addSequence(
+ new SequenceMetadata<>(
+ 0,
+ StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), 0),
+ ioConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap(),
+ endOffsets,
+ false,
+ ioConfig.getStartSequenceNumbers().getExclusivePartitions()
+ )
+ );
}
}
log.info("Starting with sequences: %s", sequences);
+ }
+
+ private TaskStatus runInternal(TaskToolbox toolbox) throws Exception
+ {
+ log.info("SeekableStream indexing task starting up!");
+ startTime = DateTimes.nowUtc();
+ status = Status.STARTING;
+
+ setToolbox(toolbox);
+ initializeSequences();
if (chatHandlerProvider.isPresent()) {
log.info("Found chat handler of class[%s]", chatHandlerProvider.get().getClass().getName());
@@ -392,10 +425,10 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
}
// sequences size can be 0 only when all sequences got published and task stopped before it could finish
// which is super rare
- if (sequences.size() == 0 || sequences.get(sequences.size() - 1).isCheckpointed()) {
+ if (sequences.size() == 0 || getLastSequenceMetadata().isCheckpointed()) {
this.endOffsets.putAll(sequences.size() == 0
? currOffsets
- : sequences.get(sequences.size() - 1).getEndOffsets());
+ : getLastSequenceMetadata().getEndOffsets());
log.info("End sequences changed to [%s]", endOffsets);
}
}
@@ -479,7 +512,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
}
// if stop is requested or task's end sequence is set by call to setEndOffsets method with finish set to true
- if (stopRequested.get() || sequences.size() == 0 || sequences.get(sequences.size() - 1).isCheckpointed()) {
+ if (stopRequested.get() || sequences.size() == 0 || getLastSequenceMetadata().isCheckpointed()) {
status = Status.PUBLISHING;
}
@@ -628,12 +661,12 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
}
if (System.currentTimeMillis() > nextCheckpointTime) {
- sequenceToCheckpoint = sequences.get(sequences.size() - 1);
+ sequenceToCheckpoint = getLastSequenceMetadata();
}
if (sequenceToCheckpoint != null && stillReading) {
Preconditions.checkArgument(
- sequences.get(sequences.size() - 1)
+ getLastSequenceMetadata()
.getSequenceName()
.equals(sequenceToCheckpoint.getSequenceName()),
"Cannot checkpoint a sequence [%s] which is not the latest one, sequences %s",
@@ -649,7 +682,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
new SeekableStreamStartSequenceNumbers<>(
stream,
sequenceToCheckpoint.getStartOffsets(),
- ioConfig.getStartSequenceNumbers().getExclusivePartitions()
+ sequenceToCheckpoint.getExclusiveStartPartitions()
)
),
createDataSourceMetadata(
@@ -1060,7 +1093,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
final SequenceOffsetType startOffset = entry.getValue();
if (!sequences.isEmpty()) {
- final SequenceOffsetType priorOffset = sequences.get(sequences.size() - 1).endOffsets.get(partition);
+ final SequenceOffsetType priorOffset = getLastSequenceMetadata().endOffsets.get(partition);
if (!startOffset.equals(priorOffset)) {
throw new ISE(
@@ -1072,9 +1105,26 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
}
}
+ if (!isEndOffsetExclusive() && !sequences.isEmpty()) {
+ final SequenceMetadata<PartitionIdType, SequenceOffsetType> lastMetadata = getLastSequenceMetadata();
+ if (!lastMetadata.endOffsets.keySet().equals(sequenceMetadata.getExclusiveStartPartitions())) {
+ throw new ISE(
+ "Exclusive start partitions[%s] for new sequence don't match to the prior offset[%s]",
+ sequenceMetadata.getExclusiveStartPartitions(),
+ lastMetadata
+ );
+ }
+ }
+
// Actually do the add.
sequences.add(sequenceMetadata);
}
+
+ private SequenceMetadata<PartitionIdType, SequenceOffsetType> getLastSequenceMetadata()
+ {
+ Preconditions.checkState(!sequences.isEmpty(), "Empty sequences");
+ return sequences.get(sequences.size() - 1);
+ }
/**
* Returns true if the given record has already been read, based on lastReadOffsets.
@@ -1453,7 +1503,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
// and after acquiring pauseLock to correctly guard against duplicate requests
Preconditions.checkState(sequenceNumbers.size() > 0, "WTH?! No Sequences found to set end sequences");
- final SequenceMetadata<PartitionIdType, SequenceOffsetType> latestSequence = sequences.get(sequences.size() - 1);
+ final SequenceMetadata<PartitionIdType, SequenceOffsetType> latestSequence = getLastSequenceMetadata();
final Set<PartitionIdType> exclusiveStartPartitions;
if (isEndOffsetExclusive()) {
@@ -1545,6 +1595,12 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
nextCheckpointTime = DateTimes.nowUtc().plus(tuningConfig.getIntermediateHandoffPeriod()).getMillis();
}
+ @VisibleForTesting
+ public CopyOnWriteArrayList<SequenceMetadata<PartitionIdType, SequenceOffsetType>> getSequences()
+ {
+ return sequences;
+ }
+
@GET
@Path("/checkpoints")
@Produces(MediaType.APPLICATION_JSON)
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java
index 9cbafd0..20eec35 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java
@@ -216,11 +216,12 @@ public class SequenceMetadata<PartitionIdType, SequenceOffsetType>
lock.lock();
try {
return "SequenceMetadata{" +
- "sequenceName='" + sequenceName + '\'' +
- ", sequenceId=" + sequenceId +
+ "sequenceId=" + sequenceId +
+ ", sequenceName='" + sequenceName + '\'' +
+ ", assignments=" + assignments +
", startOffsets=" + startOffsets +
+ ", exclusiveStartPartitions=" + exclusiveStartPartitions +
", endOffsets=" + endOffsets +
- ", assignments=" + assignments +
", sentinel=" + sentinel +
", checkpointed=" + checkpointed +
'}';
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 0b7a2c2..f904e85 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -122,6 +122,7 @@ import java.util.stream.Stream;
public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType> implements Supervisor
{
public static final String IS_INCREMENTAL_HANDOFF_SUPPORTED = "IS_INCREMENTAL_HANDOFF_SUPPORTED";
+ public static final String CHECKPOINTS_CTX_KEY = "checkpoints";
private static final long MAX_RUN_FREQUENCY_MILLIS = 1000;
private static final long MINIMUM_FUTURE_TIMEOUT_IN_SECONDS = 120;
@@ -164,7 +165,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
ImmutableMap<PartitionIdType, SequenceOffsetType> startingSequences,
Optional<DateTime> minimumMessageTime,
Optional<DateTime> maximumMessageTime,
- Set<PartitionIdType> exclusiveStartSequenceNumberPartitions
+ @Nullable Set<PartitionIdType> exclusiveStartSequenceNumberPartitions
)
{
this.groupId = groupId;
@@ -174,7 +175,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
this.checkpointSequences.put(0, startingSequences);
this.exclusiveStartSequenceNumberPartitions = exclusiveStartSequenceNumberPartitions != null
? exclusiveStartSequenceNumberPartitions
- : new HashSet<>();
+ : Collections.emptySet();
this.baseSequenceName = generateSequenceName(startingSequences, minimumMessageTime, maximumMessageTime);
}
@@ -2348,7 +2349,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
if (!getNotSetMarker().equals(sequence)) {
// if we are given a startingOffset (set by a previous task group which is pending completion) then use it
if (!isEndOfShard(sequence)) {
- builder.put(partition, makeSequenceNumber(sequence, useExclusiveStartSequenceNumberForStartSequence()));
+ builder.put(partition, makeSequenceNumber(sequence, useExclusiveStartSequenceNumberForNonFirstSequence()));
}
} else {
// if we don't have a startingOffset (first run or we had some previous failures and reset the sequences) then
@@ -2394,7 +2395,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
}
}
- return makeSequenceNumber(sequence, useExclusiveStartSequenceNumberForStartSequence());
+ return makeSequenceNumber(sequence, useExclusiveStartSequenceNumberForNonFirstSequence());
} else {
boolean useEarliestSequenceNumber = ioConfig.isUseEarliestSequenceNumber();
if (subsequentlyDiscoveredPartitions.contains(partition)) {
@@ -2463,7 +2464,9 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
for (PartitionIdType partition : startPartitions.keySet()) {
endPartitions.put(partition, getEndOfPartitionMarker());
}
- Set<PartitionIdType> exclusiveStartSequenceNumberPartitions = activelyReadingTaskGroups.get(groupId).exclusiveStartSequenceNumberPartitions;
+ Set<PartitionIdType> exclusiveStartSequenceNumberPartitions = activelyReadingTaskGroups
+ .get(groupId)
+ .exclusiveStartSequenceNumberPartitions;
DateTime minimumMessageTime = activelyReadingTaskGroups.get(groupId).minimumMessageTime.orNull();
DateTime maximumMessageTime = activelyReadingTaskGroups.get(groupId).maximumMessageTime.orNull();
@@ -2654,6 +2657,16 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
);
}
+ protected Map<String, Object> createBaseTaskContexts()
+ {
+ final Map<String, Object> contexts = new HashMap<>();
+ contexts.put(IS_INCREMENTAL_HANDOFF_SUPPORTED, true);
+ if (spec.getContext() != null) {
+ contexts.putAll(spec.getContext());
+ }
+ return contexts;
+ }
+
/**
* creates a specific task IOConfig instance for Kafka/Kinesis
*
@@ -2821,5 +2834,5 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
* For example, in Kinesis, the start offsets are inclusive for the first sequence, but exclusive for following
* sequences. In Kafka, start offsets are always inclusive.
*/
- protected abstract boolean useExclusiveStartSequenceNumberForStartSequence();
+ protected abstract boolean useExclusiveStartSequenceNumberForNonFirstSequence();
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
index 961e066..2eb8858 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
@@ -37,6 +37,7 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig;
+import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
@@ -51,6 +52,7 @@ public abstract class SeekableStreamSupervisorSpec implements SupervisorSpec
private final DataSchema dataSchema;
private final SeekableStreamSupervisorTuningConfig tuningConfig;
private final SeekableStreamSupervisorIOConfig ioConfig;
+ @Nullable
private final Map<String, Object> context;
protected final ServiceEmitter emitter;
protected final DruidMonitorSchedulerConfig monitorSchedulerConfig;
@@ -61,7 +63,7 @@ public abstract class SeekableStreamSupervisorSpec implements SupervisorSpec
@JsonProperty("dataSchema") DataSchema dataSchema,
@JsonProperty("tuningConfig") SeekableStreamSupervisorTuningConfig tuningConfig,
@JsonProperty("ioConfig") SeekableStreamSupervisorIOConfig ioConfig,
- @JsonProperty("context") Map<String, Object> context,
+ @JsonProperty("context") @Nullable Map<String, Object> context,
@JsonProperty("suspended") Boolean suspended,
@JacksonInject TaskStorage taskStorage,
@JacksonInject TaskMaster taskMaster,
@@ -107,6 +109,7 @@ public abstract class SeekableStreamSupervisorSpec implements SupervisorSpec
return ioConfig;
}
+ @Nullable
@JsonProperty
public Map<String, Object> getContext()
{
diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index 421007d..22f602a 100644
--- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -892,8 +892,11 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
if (!startMetadataMatchesExisting) {
// Not in the desired start state.
- log.info("Not updating metadata, existing state is not the expected start state.");
- log.debug("Existing database state [%s], request's start metadata [%s]", oldCommitMetadataFromDb, startMetadata);
+ log.error(
+ "Not updating metadata, existing state[%s] in metadata store doesn't match to the new start state[%s].",
+ oldCommitMetadataBytesFromDb,
+ startMetadata
+ );
return DataSourceMetadataUpdateResult.FAILURE;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org