You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by cw...@apache.org on 2019/04/24 22:14:06 UTC
[incubator-druid] 05/20: Support Kafka supervisor adopting running
tasks between versions (#7212)
This is an automated email from the ASF dual-hosted git repository.
cwylie pushed a commit to branch 0.14.1-incubating
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
commit 6756b6fbced9029c0c5d3301b13dce03e6f75047
Author: Justin Borromeo <jb...@edu.uwaterloo.ca>
AuthorDate: Wed Apr 10 18:16:38 2019 -0700
Support Kafka supervisor adopting running tasks between versions (#7212)
* Recompute hash in isTaskCurrent() and added tests
* Fixed checkstyle stuff
* Fixed failing tests
* Make TestableKafkaSupervisorWithCustomIsTaskCurrent static
* Add doc
* baseSequenceName change
* Added comment
* WIP
* Fixed imports
* Undid lambda change for diff sake
* Cleanup
* Added comment
* Reinsert Kafka tests
* Readded kinesis test
* Readd bad partition assignment in kinesis supervisor test
* Nit
* Misnamed var
---
.../kafka/KafkaIndexTaskTuningConfigTest.java | 96 ++
.../kafka/supervisor/KafkaSupervisorTest.java | 709 +++++++++++--
.../TestModifiedKafkaIndexTaskTuningConfig.java | 88 ++
.../kinesis/KinesisIndexTaskTuningConfigTest.java | 117 +++
.../kinesis/supervisor/KinesisSupervisorTest.java | 1108 +++++++++++++++-----
.../TestModifiedKinesisIndexTaskTuningConfig.java | 132 +++
.../supervisor/SeekableStreamSupervisor.java | 165 ++-
.../druid/segment/indexing/DataSchemaTest.java | 79 ++
.../segment/indexing/TestModifiedDataSchema.java | 56 +
9 files changed, 2126 insertions(+), 424 deletions(-)
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java
index 14ecd4e..63b0e98 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java
@@ -22,6 +22,7 @@ package org.apache.druid.indexing.kafka;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorTuningConfig;
+import org.apache.druid.indexing.kafka.test.TestModifiedKafkaIndexTaskTuningConfig;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.indexing.TuningConfig;
@@ -30,6 +31,7 @@ import org.junit.Assert;
import org.junit.Test;
import java.io.File;
+import java.io.IOException;
public class KafkaIndexTaskTuningConfigTest
{
@@ -145,6 +147,100 @@ public class KafkaIndexTaskTuningConfigTest
Assert.assertEquals(5L, copy.getHandoffConditionTimeout());
}
+ @Test
+ public void testSerdeWithModifiedTuningConfigAddedField() throws IOException
+ {
+ KafkaIndexTaskTuningConfig base = new KafkaIndexTaskTuningConfig(
+ 1,
+ null,
+ 2,
+ 10L,
+ new Period("PT3S"),
+ new File("/tmp/xxx"),
+ 4,
+ new IndexSpec(),
+ true,
+ true,
+ 5L,
+ null,
+ null,
+ null,
+ true,
+ 42,
+ 42
+ );
+
+ String serialized = mapper.writeValueAsString(base);
+ TestModifiedKafkaIndexTaskTuningConfig deserialized =
+ mapper.readValue(serialized, TestModifiedKafkaIndexTaskTuningConfig.class);
+
+ Assert.assertEquals(null, deserialized.getExtra());
+ Assert.assertEquals(base.getMaxRowsInMemory(), deserialized.getMaxRowsInMemory());
+ Assert.assertEquals(base.getMaxBytesInMemory(), deserialized.getMaxBytesInMemory());
+ Assert.assertEquals(base.getMaxRowsPerSegment(), deserialized.getMaxRowsPerSegment());
+ Assert.assertEquals(base.getMaxTotalRows(), deserialized.getMaxTotalRows());
+ Assert.assertEquals(base.getIntermediatePersistPeriod(), deserialized.getIntermediatePersistPeriod());
+ Assert.assertEquals(base.getBasePersistDirectory(), deserialized.getBasePersistDirectory());
+ Assert.assertEquals(base.getMaxPendingPersists(), deserialized.getMaxPendingPersists());
+ Assert.assertEquals(base.getIndexSpec(), deserialized.getIndexSpec());
+ Assert.assertEquals(base.getBuildV9Directly(), deserialized.getBuildV9Directly());
+ Assert.assertEquals(base.isReportParseExceptions(), deserialized.isReportParseExceptions());
+ Assert.assertEquals(base.getHandoffConditionTimeout(), deserialized.getHandoffConditionTimeout());
+ Assert.assertEquals(base.isResetOffsetAutomatically(), deserialized.isResetOffsetAutomatically());
+ Assert.assertEquals(base.getSegmentWriteOutMediumFactory(), deserialized.getSegmentWriteOutMediumFactory());
+ Assert.assertEquals(base.getIntermediateHandoffPeriod(), deserialized.getIntermediateHandoffPeriod());
+ Assert.assertEquals(base.isLogParseExceptions(), deserialized.isLogParseExceptions());
+ Assert.assertEquals(base.getMaxParseExceptions(), deserialized.getMaxParseExceptions());
+ Assert.assertEquals(base.getMaxSavedParseExceptions(), deserialized.getMaxSavedParseExceptions());
+ }
+
+ @Test
+ public void testSerdeWithModifiedTuningConfigRemovedField() throws IOException
+ {
+ TestModifiedKafkaIndexTaskTuningConfig base = new TestModifiedKafkaIndexTaskTuningConfig(
+ 1,
+ null,
+ 2,
+ 10L,
+ new Period("PT3S"),
+ new File("/tmp/xxx"),
+ 4,
+ new IndexSpec(),
+ true,
+ true,
+ 5L,
+ null,
+ null,
+ null,
+ true,
+ 42,
+ 42,
+ "extra string"
+ );
+
+ String serialized = mapper.writeValueAsString(base);
+ KafkaIndexTaskTuningConfig deserialized =
+ mapper.readValue(serialized, KafkaIndexTaskTuningConfig.class);
+
+ Assert.assertEquals(base.getMaxRowsInMemory(), deserialized.getMaxRowsInMemory());
+ Assert.assertEquals(base.getMaxBytesInMemory(), deserialized.getMaxBytesInMemory());
+ Assert.assertEquals(base.getMaxRowsPerSegment(), deserialized.getMaxRowsPerSegment());
+ Assert.assertEquals(base.getMaxTotalRows(), deserialized.getMaxTotalRows());
+ Assert.assertEquals(base.getIntermediatePersistPeriod(), deserialized.getIntermediatePersistPeriod());
+ Assert.assertEquals(base.getBasePersistDirectory(), deserialized.getBasePersistDirectory());
+ Assert.assertEquals(base.getMaxPendingPersists(), deserialized.getMaxPendingPersists());
+ Assert.assertEquals(base.getIndexSpec(), deserialized.getIndexSpec());
+ Assert.assertEquals(base.getBuildV9Directly(), deserialized.getBuildV9Directly());
+ Assert.assertEquals(base.isReportParseExceptions(), deserialized.isReportParseExceptions());
+ Assert.assertEquals(base.getHandoffConditionTimeout(), deserialized.getHandoffConditionTimeout());
+ Assert.assertEquals(base.isResetOffsetAutomatically(), deserialized.isResetOffsetAutomatically());
+ Assert.assertEquals(base.getSegmentWriteOutMediumFactory(), deserialized.getSegmentWriteOutMediumFactory());
+ Assert.assertEquals(base.getIntermediateHandoffPeriod(), deserialized.getIntermediateHandoffPeriod());
+ Assert.assertEquals(base.isLogParseExceptions(), deserialized.isLogParseExceptions());
+ Assert.assertEquals(base.getMaxParseExceptions(), deserialized.getMaxParseExceptions());
+ Assert.assertEquals(base.getMaxSavedParseExceptions(), deserialized.getMaxSavedParseExceptions());
+ }
+
private static KafkaIndexTaskTuningConfig copy(KafkaIndexTaskTuningConfig config)
{
return new KafkaIndexTaskTuningConfig(
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index bb8c6a2..62280e0 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -49,6 +49,7 @@ import org.apache.druid.indexing.kafka.KafkaIndexTask;
import org.apache.druid.indexing.kafka.KafkaIndexTaskClient;
import org.apache.druid.indexing.kafka.KafkaIndexTaskClientFactory;
import org.apache.druid.indexing.kafka.KafkaIndexTaskIOConfig;
+import org.apache.druid.indexing.kafka.KafkaIndexTaskTuningConfig;
import org.apache.druid.indexing.kafka.test.TestBroker;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
@@ -61,6 +62,7 @@ import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.supervisor.SupervisorReport;
import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.Status;
+import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
import org.apache.druid.indexing.seekablestream.supervisor.TaskReportData;
import org.apache.druid.java.util.common.DateTimes;
@@ -139,7 +141,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
private static int topicPostfix;
private static ZkUtils zkUtils;
-
private final int numThreads;
private KafkaSupervisor supervisor;
@@ -259,7 +260,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
@Test
public void testNoInitialState() throws Exception
{
- supervisor = getSupervisor(1, 1, true, "PT1H", null, null, false);
+ supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null, false);
addSomeEvents(1);
Capture<KafkaIndexTask> captured = Capture.newInstance();
@@ -315,7 +316,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
@Test
public void testSkipOffsetGaps() throws Exception
{
- supervisor = getSupervisor(1, 1, true, "PT1H", null, null, true);
+ supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null, false);
addSomeEvents(1);
Capture<KafkaIndexTask> captured = Capture.newInstance();
@@ -338,13 +339,12 @@ public class KafkaSupervisorTest extends EasyMockSupport
KafkaIndexTask task = captured.getValue();
KafkaIndexTaskIOConfig taskConfig = task.getIOConfig();
- Assert.assertTrue("skipOffsetGaps", taskConfig.isSkipOffsetGaps());
}
@Test
public void testMultiTask() throws Exception
{
- supervisor = getSupervisor(1, 2, true, "PT1H", null, null, false);
+ supervisor = getTestableSupervisor(1, 2, true, "PT1H", null, null, false);
addSomeEvents(1);
Capture<KafkaIndexTask> captured = Capture.newInstance(CaptureType.ALL);
@@ -399,7 +399,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
@Test
public void testReplicas() throws Exception
{
- supervisor = getSupervisor(2, 1, true, "PT1H", null, null, false);
+ supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, null, false);
addSomeEvents(1);
Capture<KafkaIndexTask> captured = Capture.newInstance(CaptureType.ALL);
@@ -454,7 +454,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
@Test
public void testLateMessageRejectionPeriod() throws Exception
{
- supervisor = getSupervisor(2, 1, true, "PT1H", new Period("PT1H"), null, false);
+ supervisor = getTestableSupervisor(2, 1, true, "PT1H", new Period("PT1H"), null, false);
addSomeEvents(1);
Capture<KafkaIndexTask> captured = Capture.newInstance(CaptureType.ALL);
@@ -493,7 +493,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
@Test
public void testEarlyMessageRejectionPeriod() throws Exception
{
- supervisor = getSupervisor(2, 1, true, "PT1H", null, new Period("PT1H"), false);
+ supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, new Period("PT1H"), false);
addSomeEvents(1);
Capture<KafkaIndexTask> captured = Capture.newInstance(CaptureType.ALL);
@@ -535,7 +535,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
*/
public void testLatestOffset() throws Exception
{
- supervisor = getSupervisor(1, 1, false, "PT1H", null, null, false);
+ supervisor = getTestableSupervisor(1, 1, false, "PT1H", null, null, false);
addSomeEvents(1100);
Capture<KafkaIndexTask> captured = Capture.newInstance();
@@ -576,7 +576,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
*/
public void testDatasourceMetadata() throws Exception
{
- supervisor = getSupervisor(1, 1, true, "PT1H", null, null, false);
+ supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null, false);
addSomeEvents(100);
Capture<KafkaIndexTask> captured = Capture.newInstance();
@@ -615,7 +615,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
@Test(expected = ISE.class)
public void testBadMetadataOffsets() throws Exception
{
- supervisor = getSupervisor(1, 1, true, "PT1H", null, null, false);
+ supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null, false);
addSomeEvents(1);
expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
@@ -632,47 +632,14 @@ public class KafkaSupervisorTest extends EasyMockSupport
}
@Test
- public void testKillIncompatibleTasks() throws Exception
+ public void testDontKillTasksWithMismatchedDatasourceAndType() throws Exception
{
- supervisor = getSupervisor(2, 1, true, "PT1H", null, null, false);
+ supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, null, false);
addSomeEvents(1);
- // unexpected # of partitions (kill)
+ // different datasource (don't kill)
Task id1 = createKafkaIndexTask(
"id1",
- DATASOURCE,
- 1,
- new SeekableStreamStartSequenceNumbers<>("topic", ImmutableMap.of(0, 0L), ImmutableSet.of()),
- new SeekableStreamEndSequenceNumbers<>("topic", ImmutableMap.of(0, 10L)),
- null,
- null
- );
-
- // correct number of partitions and ranges (don't kill)
- Task id2 = createKafkaIndexTask(
- "id2",
- DATASOURCE,
- 0,
- new SeekableStreamStartSequenceNumbers<>("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L), ImmutableSet.of()),
- new SeekableStreamEndSequenceNumbers<>("topic", ImmutableMap.of(0, 333L, 1, 333L, 2, 333L)),
- null,
- null
- );
-
- // unexpected range on partition 2 (kill)
- Task id3 = createKafkaIndexTask(
- "id3",
- DATASOURCE,
- 1,
- new SeekableStreamStartSequenceNumbers<>("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 1L), ImmutableSet.of()),
- new SeekableStreamEndSequenceNumbers<>("topic", ImmutableMap.of(0, 333L, 1, 333L, 2, 330L)),
- null,
- null
- );
-
- // different datasource (don't kill)
- Task id4 = createKafkaIndexTask(
- "id4",
"other-datasource",
2,
new SeekableStreamStartSequenceNumbers<>("topic", ImmutableMap.of(0, 0L), ImmutableSet.of()),
@@ -682,8 +649,8 @@ public class KafkaSupervisorTest extends EasyMockSupport
);
// non KafkaIndexTask (don't kill)
- Task id5 = new RealtimeIndexTask(
- "id5",
+ Task id2 = new RealtimeIndexTask(
+ "id2",
null,
new FireDepartment(
dataSchema,
@@ -693,18 +660,12 @@ public class KafkaSupervisorTest extends EasyMockSupport
null
);
- List<Task> existingTasks = ImmutableList.of(id1, id2, id3, id4, id5);
+ List<Task> existingTasks = ImmutableList.of(id1, id2);
expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes();
expect(taskStorage.getActiveTasks()).andReturn(existingTasks).anyTimes();
- expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
- expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
- expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes();
- expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes();
- expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes();
- expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes();
expect(taskClient.getStatusAsync(EasyMock.anyString()))
.andReturn(Futures.immediateFuture(Status.NOT_STARTED))
.anyTimes();
@@ -716,18 +677,10 @@ public class KafkaSupervisorTest extends EasyMockSupport
null
)
).anyTimes();
- expect(taskClient.stopAsync("id1", false)).andReturn(Futures.immediateFuture(true));
- expect(taskClient.stopAsync("id3", false)).andReturn(Futures.immediateFuture(false));
+
taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class));
- taskQueue.shutdown("id3", "Task [%s] failed to stop in a timely manner, killing task", "id3");
- expect(taskQueue.add(anyObject(Task.class))).andReturn(true);
-
- TreeMap<Integer, Map<Integer, Long>> checkpoints = new TreeMap<>();
- checkpoints.put(0, ImmutableMap.of(0, 0L, 1, 0L, 2, 0L));
- expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), EasyMock.anyBoolean()))
- .andReturn(Futures.immediateFuture(checkpoints))
- .times(2);
+ expect(taskQueue.add(anyObject(Task.class))).andReturn(true).anyTimes();
replayAll();
@@ -739,7 +692,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
@Test
public void testKillBadPartitionAssignment() throws Exception
{
- supervisor = getSupervisor(1, 2, true, "PT1H", null, null, false);
+ supervisor = getTestableSupervisor(1, 2, true, "PT1H", null, null, false);
addSomeEvents(1);
Task id1 = createKafkaIndexTask(
@@ -843,10 +796,11 @@ public class KafkaSupervisorTest extends EasyMockSupport
verifyAll();
}
+
@Test
public void testRequeueTaskWhenFailed() throws Exception
{
- supervisor = getSupervisor(2, 2, true, "PT1H", null, null, false);
+ supervisor = getTestableSupervisor(2, 2, true, "PT1H", null, null, false);
addSomeEvents(1);
Capture<Task> captured = Capture.newInstance(CaptureType.ALL);
@@ -935,7 +889,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
@Test
public void testRequeueAdoptedTaskWhenFailed() throws Exception
{
- supervisor = getSupervisor(2, 1, true, "PT1H", null, null, false);
+ supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, null, false);
addSomeEvents(1);
DateTime now = DateTimes.nowUtc();
@@ -1036,7 +990,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
@Test
public void testQueueNextTasksOnSuccess() throws Exception
{
- supervisor = getSupervisor(2, 2, true, "PT1H", null, null, false);
+ supervisor = getTestableSupervisor(2, 2, true, "PT1H", null, null, false);
addSomeEvents(1);
Capture<Task> captured = Capture.newInstance(CaptureType.ALL);
@@ -1138,7 +1092,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
{
final TaskLocation location = new TaskLocation("testHost", 1234, -1);
- supervisor = getSupervisor(2, 2, true, "PT1M", null, null, false);
+ supervisor = getTestableSupervisor(2, 2, true, "PT1M", null, null, false);
addSomeEvents(100);
Capture<Task> captured = Capture.newInstance(CaptureType.ALL);
@@ -1233,7 +1187,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
{
final TaskLocation location = new TaskLocation("testHost", 1234, -1);
- supervisor = getSupervisor(1, 1, true, "PT1H", null, null, false);
+ supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null, false);
addSomeEvents(1);
Task task = createKafkaIndexTask(
@@ -1348,7 +1302,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
{
final TaskLocation location = new TaskLocation("testHost", 1234, -1);
- supervisor = getSupervisor(1, 1, true, "PT1H", null, null, false);
+ supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null, false);
addSomeEvents(1);
Task task = createKafkaIndexTask(
@@ -1456,7 +1410,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
final TaskLocation location2 = new TaskLocation("testHost2", 145, -1);
final DateTime startTime = DateTimes.nowUtc();
- supervisor = getSupervisor(1, 1, true, "PT1H", null, null, false);
+ supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null, false);
addSomeEvents(6);
Task id1 = createKafkaIndexTask(
@@ -1563,7 +1517,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
@Test
public void testKillUnresponsiveTasksWhileGettingStartTime() throws Exception
{
- supervisor = getSupervisor(2, 2, true, "PT1H", null, null, false);
+ supervisor = getTestableSupervisor(2, 2, true, "PT1H", null, null, false);
addSomeEvents(1);
Capture<Task> captured = Capture.newInstance(CaptureType.ALL);
@@ -1622,7 +1576,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
{
final TaskLocation location = new TaskLocation("testHost", 1234, -1);
- supervisor = getSupervisor(2, 2, true, "PT1M", null, null, false);
+ supervisor = getTestableSupervisor(2, 2, true, "PT1M", null, null, false);
addSomeEvents(100);
Capture<Task> captured = Capture.newInstance(CaptureType.ALL);
@@ -1708,7 +1662,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
{
final TaskLocation location = new TaskLocation("testHost", 1234, -1);
- supervisor = getSupervisor(2, 2, true, "PT1M", null, null, false);
+ supervisor = getTestableSupervisor(2, 2, true, "PT1M", null, null, false);
addSomeEvents(100);
Capture<Task> captured = Capture.newInstance(CaptureType.ALL);
@@ -1799,7 +1753,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
@Test(expected = IllegalStateException.class)
public void testStopNotStarted()
{
- supervisor = getSupervisor(1, 1, true, "PT1H", null, null, false);
+ supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null, false);
supervisor.stop(false);
}
@@ -1811,7 +1765,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
taskRunner.unregisterListener(StringUtils.format("KafkaSupervisor-%s", DATASOURCE));
replayAll();
- supervisor = getSupervisor(1, 1, true, "PT1H", null, null, false);
+ supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null, false);
supervisor.start();
supervisor.stop(false);
@@ -1825,7 +1779,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
final TaskLocation location2 = new TaskLocation("testHost2", 145, -1);
final DateTime startTime = DateTimes.nowUtc();
- supervisor = getSupervisor(2, 1, true, "PT1H", null, null, false);
+ supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, null, false);
addSomeEvents(1);
Task id1 = createKafkaIndexTask(
@@ -1937,7 +1891,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class));
replayAll();
- supervisor = getSupervisor(1, 1, true, "PT1H", null, null, false);
+ supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null, false);
supervisor.start();
supervisor.runInternal();
verifyAll();
@@ -1954,7 +1908,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
@Test
public void testResetDataSourceMetadata() throws Exception
{
- supervisor = getSupervisor(1, 1, true, "PT1H", null, null, false);
+ supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null, false);
expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes();
@@ -1970,7 +1924,11 @@ public class KafkaSupervisorTest extends EasyMockSupport
Capture<DataSourceMetadata> captureDataSourceMetadata = EasyMock.newCapture();
KafkaDataSourceMetadata kafkaDataSourceMetadata = new KafkaDataSourceMetadata(
- new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 1000L, 1, 1000L, 2, 1000L), ImmutableSet.of())
+ new SeekableStreamStartSequenceNumbers<>(
+ topic,
+ ImmutableMap.of(0, 1000L, 1, 1000L, 2, 1000L),
+ ImmutableSet.of()
+ )
);
KafkaDataSourceMetadata resetMetadata = new KafkaDataSourceMetadata(
@@ -2006,7 +1964,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
@Test
public void testResetNoDataSourceMetadata() throws Exception
{
- supervisor = getSupervisor(1, 1, true, "PT1H", null, null, false);
+ supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null, false);
expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes();
@@ -2042,7 +2000,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
final TaskLocation location2 = new TaskLocation("testHost2", 145, -1);
final DateTime startTime = DateTimes.nowUtc();
- supervisor = getSupervisor(2, 1, true, "PT1H", null, null, false);
+ supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, null, false);
addSomeEvents(1);
Task id1 = createKafkaIndexTask(
@@ -2140,7 +2098,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
public void testNoDataIngestionTasks() throws Exception
{
final DateTime startTime = DateTimes.nowUtc();
- supervisor = getSupervisor(2, 1, true, "PT1S", null, null, false);
+ supervisor = getTestableSupervisor(2, 1, true, "PT1S", null, null, false);
//not adding any events
Task id1 = createKafkaIndexTask(
"id1",
@@ -2236,7 +2194,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
public void testCheckpointForInactiveTaskGroup()
throws InterruptedException, ExecutionException, TimeoutException, JsonProcessingException
{
- supervisor = getSupervisor(2, 1, true, "PT1S", null, null, false);
+ supervisor = getTestableSupervisor(2, 1, true, "PT1S", null, null, false);
//not adding any events
final Task id1 = createKafkaIndexTask(
"id1",
@@ -2329,8 +2287,16 @@ public class KafkaSupervisorTest extends EasyMockSupport
supervisor.checkpoint(
0,
((KafkaIndexTask) id1).getIOConfig().getBaseSequenceName(),
- new KafkaDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(topic, checkpoints.get(0), ImmutableSet.of())),
- new KafkaDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(topic, fakeCheckpoints, fakeCheckpoints.keySet()))
+ new KafkaDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(
+ topic,
+ checkpoints.get(0),
+ ImmutableSet.of()
+ )),
+ new KafkaDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(
+ topic,
+ fakeCheckpoints,
+ fakeCheckpoints.keySet()
+ ))
);
while (supervisor.getNoticesQueueSize() > 0) {
@@ -2348,7 +2314,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
public void testCheckpointForUnknownTaskGroup()
throws InterruptedException
{
- supervisor = getSupervisor(2, 1, true, "PT1S", null, null, false);
+ supervisor = getTestableSupervisor(2, 1, true, "PT1S", null, null, false);
//not adding any events
final Task id1 = createKafkaIndexTask(
"id1",
@@ -2409,8 +2375,16 @@ public class KafkaSupervisorTest extends EasyMockSupport
supervisor.checkpoint(
0,
((KafkaIndexTask) id1).getIOConfig().getBaseSequenceName(),
- new KafkaDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(topic, Collections.emptyMap(), ImmutableSet.of())),
- new KafkaDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(topic, Collections.emptyMap(), ImmutableSet.of()))
+ new KafkaDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(
+ topic,
+ Collections.emptyMap(),
+ ImmutableSet.of()
+ )),
+ new KafkaDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(
+ topic,
+ Collections.emptyMap(),
+ ImmutableSet.of()
+ ))
);
while (supervisor.getNoticesQueueSize() > 0) {
@@ -2437,7 +2411,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
public void testCheckpointWithNullTaskGroupId()
throws InterruptedException, ExecutionException, TimeoutException, JsonProcessingException
{
- supervisor = getSupervisor(1, 3, true, "PT1S", null, null, false);
+ supervisor = getTestableSupervisor(1, 3, true, "PT1S", null, null, false);
//not adding any events
final Task id1 = createKafkaIndexTask(
"id1",
@@ -2515,7 +2489,11 @@ public class KafkaSupervisorTest extends EasyMockSupport
supervisor.checkpoint(
null,
((KafkaIndexTask) id1).getIOConfig().getBaseSequenceName(),
- new KafkaDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(topic, checkpoints.get(0), ImmutableSet.of())),
+ new KafkaDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(
+ topic,
+ checkpoints.get(0),
+ ImmutableSet.of()
+ )),
new KafkaDataSourceMetadata(
new SeekableStreamStartSequenceNumbers<>(topic, newCheckpoints.get(0), newCheckpoints.get(0).keySet())
)
@@ -2531,7 +2509,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
@Test
public void testSuspendedNoRunningTasks() throws Exception
{
- supervisor = getSupervisor(1, 1, true, "PT1H", null, null, false, true, kafkaHost);
+ supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null, false, true, kafkaHost);
addSomeEvents(1);
expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
@@ -2564,7 +2542,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
final TaskLocation location2 = new TaskLocation("testHost2", 145, -1);
final DateTime startTime = DateTimes.nowUtc();
- supervisor = getSupervisor(2, 1, true, "PT1H", null, null, false, true, kafkaHost);
+ supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, null, false, true, kafkaHost);
addSomeEvents(1);
Task id1 = createKafkaIndexTask(
@@ -2672,7 +2650,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class));
replayAll();
- supervisor = getSupervisor(1, 1, true, "PT1H", null, null, false, true, kafkaHost);
+ supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null, false, true, kafkaHost);
supervisor.start();
supervisor.runInternal();
verifyAll();
@@ -2689,7 +2667,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
public void testFailedInitializationAndRecovery() throws Exception
{
// Block the supervisor initialization with a bad hostname config, make sure this doesn't block the lifecycle
- supervisor = getSupervisor(
+ supervisor = getTestableSupervisor(
1,
1,
true,
@@ -2797,7 +2775,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
@Test
public void testGetCurrentTotalStats()
{
- supervisor = getSupervisor(1, 2, true, "PT1H", null, null, false);
+ supervisor = getTestableSupervisor(1, 2, true, "PT1H", null, null, false, false, kafkaHost);
supervisor.addTaskGroupToActivelyReadingTaskGroup(
supervisor.getTaskGroupIdForPartition(0),
ImmutableMap.of(0, 0L),
@@ -2838,6 +2816,286 @@ public class KafkaSupervisorTest extends EasyMockSupport
Assert.assertEquals(ImmutableMap.of("task2", ImmutableMap.of("prop2", "val2")), stats.get("1"));
}
+ @Test
+ public void testDoNotKillCompatibleTasks()
+ throws Exception
+ {
+ // This supervisor always returns true for isTaskCurrent -> it should not kill its tasks
+ int numReplicas = 2;
+ supervisor = getTestableSupervisorCustomIsTaskCurrent(
+ numReplicas,
+ 1,
+ true,
+ "PT1H",
+ new Period("P1D"),
+ new Period("P1D"),
+ false,
+ true
+ );
+
+ addSomeEvents(1);
+
+ Task task = createKafkaIndexTask(
+ "id1",
+ DATASOURCE,
+ 0,
+ new SeekableStreamStartSequenceNumbers<>(
+ "topic",
+ ImmutableMap.of(0, 0L, 2, 0L),
+ ImmutableSet.of()
+ ),
+ new SeekableStreamEndSequenceNumbers<>(
+ "topic",
+ ImmutableMap.of(0, Long.MAX_VALUE, 2, Long.MAX_VALUE)
+ ),
+ null,
+ null
+ );
+
+ List<Task> existingTasks = ImmutableList.of(task);
+
+ EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
+ EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
+ EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes();
+ EasyMock.expect(taskStorage.getActiveTasks()).andReturn(existingTasks).anyTimes();
+ EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
+ EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(task)).anyTimes();
+ EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString()))
+ .andReturn(Futures.immediateFuture(Status.NOT_STARTED))
+ .anyTimes();
+ EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.anyString()))
+ .andReturn(Futures.immediateFuture(DateTimes.nowUtc()))
+ .anyTimes();
+ EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(
+ new KafkaDataSourceMetadata(
+ null
+ )
+ ).anyTimes();
+
+ taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class));
+ EasyMock.expect(taskQueue.add(EasyMock.anyObject(Task.class))).andReturn(true);
+
+ TreeMap<Integer, Map<Integer, Long>> checkpoints1 = new TreeMap<>();
+ checkpoints1.put(0, ImmutableMap.of(0, 0L, 2, 0L));
+
+ EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), EasyMock.anyBoolean()))
+ .andReturn(Futures.immediateFuture(checkpoints1))
+ .times(numReplicas);
+
+ replayAll();
+ supervisor.start();
+ supervisor.runInternal();
+ verifyAll();
+ }
+
+ @Test
+ public void testKillIncompatibleTasks()
+ throws Exception
+ {
+ // This supervisor always returns false for isTaskCurrent -> it should kill its tasks
+ int numReplicas = 2;
+ supervisor = getTestableSupervisorCustomIsTaskCurrent(
+ numReplicas,
+ 1,
+ true,
+ "PT1H",
+ new Period("P1D"),
+ new Period("P1D"),
+ false,
+ false
+ );
+
+ addSomeEvents(1);
+
+ Task task = createKafkaIndexTask(
+ "id1",
+ DATASOURCE,
+ 0,
+ new SeekableStreamStartSequenceNumbers<>(
+ "topic",
+ ImmutableMap.of(0, 0L, 2, 0L),
+ ImmutableSet.of()
+ ),
+ new SeekableStreamEndSequenceNumbers<>("topic", ImmutableMap.of(0, Long.MAX_VALUE, 2, Long.MAX_VALUE)),
+ null,
+ null
+ );
+
+ List<Task> existingTasks = ImmutableList.of(task);
+ EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
+ EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
+ EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes();
+ EasyMock.expect(taskStorage.getActiveTasks()).andReturn(existingTasks).anyTimes();
+ EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
+ EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(task)).anyTimes();
+ EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString()))
+ .andReturn(Futures.immediateFuture(Status.NOT_STARTED))
+ .anyTimes();
+ EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.anyString()))
+ .andReturn(Futures.immediateFuture(DateTimes.nowUtc()))
+ .anyTimes();
+ EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(
+ new KafkaDataSourceMetadata(
+ null
+ )
+ ).anyTimes();
+ EasyMock.expect(taskClient.stopAsync("id1", false)).andReturn(Futures.immediateFuture(true));
+ taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class));
+ EasyMock.expect(taskQueue.add(EasyMock.anyObject(Task.class))).andReturn(true).times(2);
+
+ replayAll();
+ supervisor.start();
+ supervisor.runInternal();
+ verifyAll();
+ }
+
+ @Test
+ public void testIsTaskCurrent()
+ {
+ DateTime minMessageTime = DateTimes.nowUtc();
+ DateTime maxMessageTime = DateTimes.nowUtc().plus(10000);
+
+ KafkaSupervisor supervisor = getSupervisor(
+ 2,
+ 1,
+ true,
+ "PT1H",
+ new Period("P1D"),
+ new Period("P1D"),
+ false,
+ kafkaHost,
+ dataSchema,
+ tuningConfig
+ );
+
+ supervisor.addTaskGroupToActivelyReadingTaskGroup(
+ 42,
+ ImmutableMap.of(0, 0L, 2, 0L),
+ Optional.of(minMessageTime),
+ Optional.of(maxMessageTime),
+ ImmutableSet.of("id1", "id2", "id3", "id4"),
+ ImmutableSet.of()
+ );
+
+ DataSchema modifiedDataSchema = getDataSchema("some other datasource");
+
+ KafkaSupervisorTuningConfig modifiedTuningConfig = new KafkaSupervisorTuningConfig(
+ 42, // This is different
+ null,
+ 50000,
+ null,
+ new Period("P1Y"),
+ new File("/test"),
+ null,
+ null,
+ true,
+ false,
+ null,
+ null,
+ null,
+ numThreads,
+ TEST_CHAT_THREADS,
+ TEST_CHAT_RETRIES,
+ TEST_HTTP_TIMEOUT,
+ TEST_SHUTDOWN_TIMEOUT,
+ null,
+ null,
+ null,
+ null,
+ null
+ );
+
+ KafkaIndexTask taskFromStorage = createKafkaIndexTask(
+ "id1",
+ 0,
+ new SeekableStreamStartSequenceNumbers<>(
+ "topic",
+ ImmutableMap.of(0, 0L, 2, 0L),
+ ImmutableSet.of()
+ ),
+ new SeekableStreamEndSequenceNumbers<>(
+ "topic",
+ ImmutableMap.of(0, Long.MAX_VALUE, 2, Long.MAX_VALUE)
+ ),
+ minMessageTime,
+ maxMessageTime,
+ dataSchema
+ );
+
+ KafkaIndexTask taskFromStorageMismatchedDataSchema = createKafkaIndexTask(
+ "id2",
+ 0,
+ new SeekableStreamStartSequenceNumbers<>(
+ "topic",
+ ImmutableMap.of(0, 0L, 2, 0L),
+ ImmutableSet.of()
+ ),
+ new SeekableStreamEndSequenceNumbers<>(
+ "topic",
+ ImmutableMap.of(0, Long.MAX_VALUE, 2, Long.MAX_VALUE)
+ ),
+ minMessageTime,
+ maxMessageTime,
+ modifiedDataSchema
+ );
+
+ KafkaIndexTask taskFromStorageMismatchedTuningConfig = createKafkaIndexTask(
+ "id3",
+ 0,
+ new SeekableStreamStartSequenceNumbers<>(
+ "topic",
+ ImmutableMap.of(0, 0L, 2, 0L),
+ ImmutableSet.of()
+ ),
+ new SeekableStreamEndSequenceNumbers<>(
+ "topic",
+ ImmutableMap.of(0, Long.MAX_VALUE, 2, Long.MAX_VALUE)
+ ),
+ minMessageTime,
+ maxMessageTime,
+ dataSchema,
+ modifiedTuningConfig
+ );
+
+ KafkaIndexTask taskFromStorageMismatchedPartitionsWithTaskGroup = createKafkaIndexTask(
+ "id4",
+ 0,
+ new SeekableStreamStartSequenceNumbers<>(
+ "topic",
+ ImmutableMap.of(0, 0L, 2, 6L),
+ ImmutableSet.of()
+ ),
+ new SeekableStreamEndSequenceNumbers<>(
+ "topic",
+ ImmutableMap.of(0, Long.MAX_VALUE, 2, Long.MAX_VALUE)
+ ),
+ minMessageTime,
+ maxMessageTime,
+ dataSchema
+ );
+
+ EasyMock.expect(taskStorage.getTask("id1"))
+ .andReturn(Optional.of(taskFromStorage))
+ .once();
+ EasyMock.expect(taskStorage.getTask("id2"))
+ .andReturn(Optional.of(taskFromStorageMismatchedDataSchema))
+ .once();
+ EasyMock.expect(taskStorage.getTask("id3"))
+ .andReturn(Optional.of(taskFromStorageMismatchedTuningConfig))
+ .once();
+ EasyMock.expect(taskStorage.getTask("id4"))
+ .andReturn(Optional.of(taskFromStorageMismatchedPartitionsWithTaskGroup))
+ .once();
+
+ replayAll();
+
+ Assert.assertTrue(supervisor.isTaskCurrent(42, "id1"));
+ Assert.assertFalse(supervisor.isTaskCurrent(42, "id2"));
+ Assert.assertFalse(supervisor.isTaskCurrent(42, "id3"));
+ Assert.assertFalse(supervisor.isTaskCurrent(42, "id4"));
+ verifyAll();
+ }
+
private void addSomeEvents(int numEventsPerPartition) throws Exception
{
//create topic manually
@@ -2859,7 +3117,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
}
}
- private KafkaSupervisor getSupervisor(
+ private KafkaSupervisor getTestableSupervisor(
int replicas,
int taskCount,
boolean useEarliestOffset,
@@ -2869,7 +3127,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
boolean skipOffsetGaps
)
{
- return getSupervisor(
+ return getTestableSupervisor(
replicas,
taskCount,
useEarliestOffset,
@@ -2882,7 +3140,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
);
}
- private KafkaSupervisor getSupervisor(
+ private KafkaSupervisor getTestableSupervisor(
int replicas,
int taskCount,
boolean useEarliestOffset,
@@ -2959,6 +3217,170 @@ public class KafkaSupervisorTest extends EasyMockSupport
);
}
+ /**
+ * Use when you want to mock the return value of SeekableStreamSupervisor#isTaskCurrent()
+ */
+ private KafkaSupervisor getTestableSupervisorCustomIsTaskCurrent(
+ int replicas,
+ int taskCount,
+ boolean useEarliestOffset,
+ String duration,
+ Period lateMessageRejectionPeriod,
+ Period earlyMessageRejectionPeriod,
+ boolean suspended,
+ boolean isTaskCurrentReturn
+ )
+ {
+ Map<String, Object> consumerProperties = new HashMap<>();
+ consumerProperties.put("myCustomKey", "myCustomValue");
+ consumerProperties.put("bootstrap.servers", kafkaHost);
+ consumerProperties.put("isolation.level", "read_committed");
+ KafkaSupervisorIOConfig kafkaSupervisorIOConfig = new KafkaSupervisorIOConfig(
+ topic,
+ replicas,
+ taskCount,
+ new Period(duration),
+ consumerProperties,
+ KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
+ new Period("P1D"),
+ new Period("PT30S"),
+ useEarliestOffset,
+ new Period("PT30M"),
+ lateMessageRejectionPeriod,
+ earlyMessageRejectionPeriod,
+ false
+ );
+
+ KafkaIndexTaskClientFactory taskClientFactory = new KafkaIndexTaskClientFactory(
+ null,
+ null
+ )
+ {
+ @Override
+ public KafkaIndexTaskClient build(
+ TaskInfoProvider taskInfoProvider,
+ String dataSource,
+ int numThreads,
+ Duration httpTimeout,
+ long numRetries
+ )
+ {
+ Assert.assertEquals(TEST_CHAT_THREADS, numThreads);
+ Assert.assertEquals(TEST_HTTP_TIMEOUT.toStandardDuration(), httpTimeout);
+ Assert.assertEquals(TEST_CHAT_RETRIES, numRetries);
+ return taskClient;
+ }
+ };
+
+ return new TestableKafkaSupervisorWithCustomIsTaskCurrent(
+ taskStorage,
+ taskMaster,
+ indexerMetadataStorageCoordinator,
+ taskClientFactory,
+ objectMapper,
+ new KafkaSupervisorSpec(
+ dataSchema,
+ tuningConfig,
+ kafkaSupervisorIOConfig,
+ null,
+ suspended,
+ taskStorage,
+ taskMaster,
+ indexerMetadataStorageCoordinator,
+ taskClientFactory,
+ objectMapper,
+ new NoopServiceEmitter(),
+ new DruidMonitorSchedulerConfig(),
+ rowIngestionMetersFactory
+ ),
+ rowIngestionMetersFactory,
+ isTaskCurrentReturn
+ );
+ }
+
+ /**
+ * Use when you don't want generateSequenceNumber overridden
+ */
+
+ private KafkaSupervisor getSupervisor(
+ int replicas,
+ int taskCount,
+ boolean useEarliestOffset,
+ String duration,
+ Period lateMessageRejectionPeriod,
+ Period earlyMessageRejectionPeriod,
+ boolean suspended,
+ String kafkaHost,
+ DataSchema dataSchema,
+ KafkaSupervisorTuningConfig tuningConfig
+ )
+ {
+ Map<String, Object> consumerProperties = new HashMap<>();
+ consumerProperties.put("myCustomKey", "myCustomValue");
+ consumerProperties.put("bootstrap.servers", kafkaHost);
+ consumerProperties.put("isolation.level", "read_committed");
+ KafkaSupervisorIOConfig kafkaSupervisorIOConfig = new KafkaSupervisorIOConfig(
+ topic,
+ replicas,
+ taskCount,
+ new Period(duration),
+ consumerProperties,
+ KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
+ new Period("P1D"),
+ new Period("PT30S"),
+ useEarliestOffset,
+ new Period("PT30M"),
+ lateMessageRejectionPeriod,
+ earlyMessageRejectionPeriod,
+ false
+ );
+
+ KafkaIndexTaskClientFactory taskClientFactory = new KafkaIndexTaskClientFactory(
+ null,
+ null
+ )
+ {
+ @Override
+ public KafkaIndexTaskClient build(
+ TaskInfoProvider taskInfoProvider,
+ String dataSource,
+ int numThreads,
+ Duration httpTimeout,
+ long numRetries
+ )
+ {
+ Assert.assertEquals(TEST_CHAT_THREADS, numThreads);
+ Assert.assertEquals(TEST_HTTP_TIMEOUT.toStandardDuration(), httpTimeout);
+ Assert.assertEquals(TEST_CHAT_RETRIES, numRetries);
+ return taskClient;
+ }
+ };
+
+ return new KafkaSupervisor(
+ taskStorage,
+ taskMaster,
+ indexerMetadataStorageCoordinator,
+ taskClientFactory,
+ objectMapper,
+ new KafkaSupervisorSpec(
+ dataSchema,
+ tuningConfig,
+ kafkaSupervisorIOConfig,
+ null,
+ suspended,
+ taskStorage,
+ taskMaster,
+ indexerMetadataStorageCoordinator,
+ taskClientFactory,
+ objectMapper,
+ new NoopServiceEmitter(),
+ new DruidMonitorSchedulerConfig(),
+ rowIngestionMetersFactory
+ ),
+ rowIngestionMetersFactory
+ );
+ }
+
private static DataSchema getDataSchema(String dataSource)
{
List<DimensionSchema> dimensions = new ArrayList<>();
@@ -3004,10 +3426,54 @@ public class KafkaSupervisorTest extends EasyMockSupport
DateTime maximumMessageTime
)
{
+ return createKafkaIndexTask(
+ id,
+ taskGroupId,
+ startPartitions,
+ endPartitions,
+ minimumMessageTime,
+ maximumMessageTime,
+ getDataSchema(dataSource)
+ );
+ }
+
+ private KafkaIndexTask createKafkaIndexTask(
+ String id,
+ int taskGroupId,
+ SeekableStreamStartSequenceNumbers<Integer, Long> startPartitions,
+ SeekableStreamEndSequenceNumbers<Integer, Long> endPartitions,
+ DateTime minimumMessageTime,
+ DateTime maximumMessageTime,
+ DataSchema schema
+ )
+ {
+ return createKafkaIndexTask(
+ id,
+ taskGroupId,
+ startPartitions,
+ endPartitions,
+ minimumMessageTime,
+ maximumMessageTime,
+ schema,
+ (KafkaIndexTaskTuningConfig) tuningConfig.convertToTaskTuningConfig()
+ );
+ }
+
+ private KafkaIndexTask createKafkaIndexTask(
+ String id,
+ int taskGroupId,
+ SeekableStreamStartSequenceNumbers<Integer, Long> startPartitions,
+ SeekableStreamEndSequenceNumbers<Integer, Long> endPartitions,
+ DateTime minimumMessageTime,
+ DateTime maximumMessageTime,
+ DataSchema schema,
+ KafkaIndexTaskTuningConfig tuningConfig
+ )
+ {
return new KafkaIndexTask(
id,
null,
- getDataSchema(dataSource),
+ schema,
tuningConfig,
new KafkaIndexTaskIOConfig(
taskGroupId,
@@ -3060,7 +3526,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
{
return dataSource;
}
-
}
private static class TestableKafkaSupervisor extends KafkaSupervisor
@@ -3090,7 +3555,9 @@ public class KafkaSupervisorTest extends EasyMockSupport
protected String generateSequenceName(
Map<Integer, Long> startPartitions,
Optional<DateTime> minimumMessageTime,
- Optional<DateTime> maximumMessageTime
+ Optional<DateTime> maximumMessageTime,
+ DataSchema dataSchema,
+ SeekableStreamIndexTaskTuningConfig tuningConfig
)
{
final int groupId = getTaskGroupIdForPartition(startPartitions.keySet().iterator().next());
@@ -3098,5 +3565,37 @@ public class KafkaSupervisorTest extends EasyMockSupport
}
}
+ private static class TestableKafkaSupervisorWithCustomIsTaskCurrent extends TestableKafkaSupervisor
+ {
+ private boolean isTaskCurrentReturn;
+
+ public TestableKafkaSupervisorWithCustomIsTaskCurrent(
+ TaskStorage taskStorage,
+ TaskMaster taskMaster,
+ IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator,
+ KafkaIndexTaskClientFactory taskClientFactory,
+ ObjectMapper mapper,
+ KafkaSupervisorSpec spec,
+ RowIngestionMetersFactory rowIngestionMetersFactory,
+ boolean isTaskCurrentReturn
+ )
+ {
+ super(
+ taskStorage,
+ taskMaster,
+ indexerMetadataStorageCoordinator,
+ taskClientFactory,
+ mapper,
+ spec,
+ rowIngestionMetersFactory
+ );
+ this.isTaskCurrentReturn = isTaskCurrentReturn;
+ }
+ @Override
+ public boolean isTaskCurrent(int taskGroupId, String taskId)
+ {
+ return isTaskCurrentReturn;
+ }
+ }
}
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java
new file mode 100644
index 0000000..3cc124f
--- /dev/null
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.kafka.test;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.druid.indexing.kafka.KafkaIndexTaskTuningConfig;
+import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
+import org.joda.time.Period;
+
+import javax.annotation.Nullable;
+import java.io.File;
+
+@JsonTypeName("KafkaTuningConfig")
+public class TestModifiedKafkaIndexTaskTuningConfig extends KafkaIndexTaskTuningConfig
+{
+ private final String extra;
+
+ @JsonCreator
+ public TestModifiedKafkaIndexTaskTuningConfig(
+ @JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory,
+ @JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory,
+ @JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment,
+ @JsonProperty("maxTotalRows") @Nullable Long maxTotalRows,
+ @JsonProperty("intermediatePersistPeriod") @Nullable Period intermediatePersistPeriod,
+ @JsonProperty("basePersistDirectory") @Nullable File basePersistDirectory,
+ @JsonProperty("maxPendingPersists") @Nullable Integer maxPendingPersists,
+ @JsonProperty("indexSpec") @Nullable IndexSpec indexSpec,
+ // This parameter is left for compatibility when reading existing configs, to be removed in Druid 0.12.
+ @JsonProperty("buildV9Directly") @Nullable Boolean buildV9Directly,
+ @Deprecated @JsonProperty("reportParseExceptions") @Nullable Boolean reportParseExceptions,
+ @JsonProperty("handoffConditionTimeout") @Nullable Long handoffConditionTimeout,
+ @JsonProperty("resetOffsetAutomatically") @Nullable Boolean resetOffsetAutomatically,
+ @JsonProperty("segmentWriteOutMediumFactory") @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
+ @JsonProperty("intermediateHandoffPeriod") @Nullable Period intermediateHandoffPeriod,
+ @JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions,
+ @JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions,
+ @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions,
+ @JsonProperty("extra") String extra
+ )
+ {
+ super(
+ maxRowsInMemory,
+ maxBytesInMemory,
+ maxRowsPerSegment,
+ maxTotalRows,
+ intermediatePersistPeriod,
+ basePersistDirectory,
+ maxPendingPersists,
+ indexSpec,
+ true,
+ reportParseExceptions,
+ handoffConditionTimeout,
+ resetOffsetAutomatically,
+ segmentWriteOutMediumFactory,
+ intermediateHandoffPeriod,
+ logParseExceptions,
+ maxParseExceptions,
+ maxSavedParseExceptions
+ );
+ this.extra = extra;
+ }
+
+ @JsonProperty("extra")
+ public String getExtra()
+ {
+ return extra;
+ }
+}
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java
index 2983324..57ffebc 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisorTuningConfig;
+import org.apache.druid.indexing.kinesis.test.TestModifiedKinesisIndexTaskTuningConfig;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.indexing.TuningConfig;
@@ -34,6 +35,7 @@ import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.File;
+import java.io.IOException;
public class KinesisIndexTaskTuningConfigTest
{
@@ -131,6 +133,121 @@ public class KinesisIndexTaskTuningConfigTest
}
@Test
+ public void testSerdeWithModifiedTuningConfigAddedField() throws IOException
+ {
+ KinesisIndexTaskTuningConfig base = new KinesisIndexTaskTuningConfig(
+ 1,
+ 3L,
+ 2,
+ 100L,
+ new Period("PT3S"),
+ new File("/tmp/xxx"),
+ 4,
+ new IndexSpec(),
+ true,
+ true,
+ 5L,
+ true,
+ false,
+ 1000,
+ 1000,
+ 500,
+ null,
+ 42,
+ null,
+ false,
+ 500,
+ 500,
+ 6000,
+ new Period("P3D")
+ );
+
+ String serialized = mapper.writeValueAsString(base);
+ TestModifiedKinesisIndexTaskTuningConfig deserialized =
+ mapper.readValue(serialized, TestModifiedKinesisIndexTaskTuningConfig.class);
+
+ Assert.assertEquals(null, deserialized.getExtra());
+ Assert.assertEquals(base.getMaxRowsInMemory(), deserialized.getMaxRowsInMemory());
+ Assert.assertEquals(base.getMaxBytesInMemory(), deserialized.getMaxBytesInMemory());
+ Assert.assertEquals(base.getMaxRowsPerSegment(), deserialized.getMaxRowsPerSegment());
+ Assert.assertEquals(base.getMaxTotalRows(), deserialized.getMaxTotalRows());
+ Assert.assertEquals(base.getIntermediatePersistPeriod(), deserialized.getIntermediatePersistPeriod());
+ Assert.assertEquals(base.getBasePersistDirectory(), deserialized.getBasePersistDirectory());
+ Assert.assertEquals(base.getMaxPendingPersists(), deserialized.getMaxPendingPersists());
+ Assert.assertEquals(base.getIndexSpec(), deserialized.getIndexSpec());
+ Assert.assertEquals(base.getBuildV9Directly(), deserialized.getBuildV9Directly());
+ Assert.assertEquals(base.isReportParseExceptions(), deserialized.isReportParseExceptions());
+ Assert.assertEquals(base.getHandoffConditionTimeout(), deserialized.getHandoffConditionTimeout());
+ Assert.assertEquals(base.isResetOffsetAutomatically(), deserialized.isResetOffsetAutomatically());
+ Assert.assertEquals(base.getSegmentWriteOutMediumFactory(), deserialized.getSegmentWriteOutMediumFactory());
+ Assert.assertEquals(base.getIntermediateHandoffPeriod(), deserialized.getIntermediateHandoffPeriod());
+ Assert.assertEquals(base.isLogParseExceptions(), deserialized.isLogParseExceptions());
+ Assert.assertEquals(base.getMaxParseExceptions(), deserialized.getMaxParseExceptions());
+ Assert.assertEquals(base.getMaxSavedParseExceptions(), deserialized.getMaxSavedParseExceptions());
+ Assert.assertEquals(base.getRecordBufferFullWait(), deserialized.getRecordBufferFullWait());
+ Assert.assertEquals(base.getRecordBufferOfferTimeout(), deserialized.getRecordBufferOfferTimeout());
+ Assert.assertEquals(base.getRecordBufferSize(), deserialized.getRecordBufferSize());
+ Assert.assertEquals(base.getMaxRecordsPerPoll(), deserialized.getMaxRecordsPerPoll());
+ }
+
+ @Test
+ public void testSerdeWithModifiedTuningConfigRemovedField() throws IOException
+ {
+ KinesisIndexTaskTuningConfig base = new KinesisIndexTaskTuningConfig(
+ 1,
+ 3L,
+ 2,
+ 100L,
+ new Period("PT3S"),
+ new File("/tmp/xxx"),
+ 4,
+ new IndexSpec(),
+ true,
+ true,
+ 5L,
+ true,
+ false,
+ 1000,
+ 1000,
+ 500,
+ null,
+ 42,
+ null,
+ false,
+ 500,
+ 500,
+ 6000,
+ new Period("P3D")
+ );
+
+ String serialized = mapper.writeValueAsString(new TestModifiedKinesisIndexTaskTuningConfig(base, "loool"));
+ KinesisIndexTaskTuningConfig deserialized =
+ mapper.readValue(serialized, KinesisIndexTaskTuningConfig.class);
+
+ Assert.assertEquals(base.getMaxRowsInMemory(), deserialized.getMaxRowsInMemory());
+ Assert.assertEquals(base.getMaxBytesInMemory(), deserialized.getMaxBytesInMemory());
+ Assert.assertEquals(base.getMaxRowsPerSegment(), deserialized.getMaxRowsPerSegment());
+ Assert.assertEquals(base.getMaxTotalRows(), deserialized.getMaxTotalRows());
+ Assert.assertEquals(base.getIntermediatePersistPeriod(), deserialized.getIntermediatePersistPeriod());
+ Assert.assertEquals(base.getBasePersistDirectory(), deserialized.getBasePersistDirectory());
+ Assert.assertEquals(base.getMaxPendingPersists(), deserialized.getMaxPendingPersists());
+ Assert.assertEquals(base.getIndexSpec(), deserialized.getIndexSpec());
+ Assert.assertEquals(base.getBuildV9Directly(), deserialized.getBuildV9Directly());
+ Assert.assertEquals(base.isReportParseExceptions(), deserialized.isReportParseExceptions());
+ Assert.assertEquals(base.getHandoffConditionTimeout(), deserialized.getHandoffConditionTimeout());
+ Assert.assertEquals(base.isResetOffsetAutomatically(), deserialized.isResetOffsetAutomatically());
+ Assert.assertEquals(base.getSegmentWriteOutMediumFactory(), deserialized.getSegmentWriteOutMediumFactory());
+ Assert.assertEquals(base.getIntermediateHandoffPeriod(), deserialized.getIntermediateHandoffPeriod());
+ Assert.assertEquals(base.isLogParseExceptions(), deserialized.isLogParseExceptions());
+ Assert.assertEquals(base.getMaxParseExceptions(), deserialized.getMaxParseExceptions());
+ Assert.assertEquals(base.getMaxSavedParseExceptions(), deserialized.getMaxSavedParseExceptions());
+ Assert.assertEquals(base.getRecordBufferFullWait(), deserialized.getRecordBufferFullWait());
+ Assert.assertEquals(base.getRecordBufferOfferTimeout(), deserialized.getRecordBufferOfferTimeout());
+ Assert.assertEquals(base.getRecordBufferSize(), deserialized.getRecordBufferSize());
+ Assert.assertEquals(base.getMaxRecordsPerPoll(), deserialized.getMaxRecordsPerPoll());
+ }
+
+ @Test
public void testResetOffsetAndSkipSequenceNotBothTrue() throws Exception
{
String jsonStr = "{\n"
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
index b42e356..cc5ae9e 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
@@ -22,7 +22,6 @@ package org.apache.druid.indexing.kinesis.supervisor;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
-import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
@@ -46,6 +45,7 @@ import org.apache.druid.indexing.kinesis.KinesisIndexTask;
import org.apache.druid.indexing.kinesis.KinesisIndexTaskClient;
import org.apache.druid.indexing.kinesis.KinesisIndexTaskClientFactory;
import org.apache.druid.indexing.kinesis.KinesisIndexTaskIOConfig;
+import org.apache.druid.indexing.kinesis.KinesisIndexTaskTuningConfig;
import org.apache.druid.indexing.kinesis.KinesisRecordSupplier;
import org.apache.druid.indexing.kinesis.KinesisSequenceNumber;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
@@ -58,6 +58,7 @@ import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.supervisor.SupervisorReport;
import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
+import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
@@ -68,6 +69,7 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.metadata.EntryExistsException;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.segment.TestHelper;
@@ -112,8 +114,6 @@ import static org.easymock.EasyMock.expectLastCall;
public class KinesisSupervisorTest extends EasyMockSupport
{
-
-
private static final ObjectMapper objectMapper = TestHelper.makeJsonMapper();
private static final String DATASOURCE = "testDS";
private static final int TEST_CHAT_THREADS = 3;
@@ -209,7 +209,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
@Test
public void testNoInitialState() throws Exception
{
- supervisor = getSupervisor(1, 1, true, "PT1H", null, null);
+ supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);
supervisorRecordSupplier.assign(anyObject());
expectLastCall().anyTimes();
@@ -274,7 +274,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
@Test
public void testMultiTask() throws Exception
{
- supervisor = getSupervisor(1, 2, true, "PT1H", null, null);
+ supervisor = getTestableSupervisor(1, 2, true, "PT1H", null, null);
supervisorRecordSupplier.assign(anyObject());
expectLastCall().anyTimes();
@@ -331,7 +331,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
@Test
public void testReplicas() throws Exception
{
- supervisor = getSupervisor(2, 1, true, "PT1H", null, null);
+ supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, null);
supervisorRecordSupplier.assign(anyObject());
expectLastCall().anyTimes();
@@ -405,7 +405,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
@Test
public void testLateMessageRejectionPeriod() throws Exception
{
- supervisor = getSupervisor(2, 1, true, "PT1H", new Period("PT1H"), null);
+ supervisor = getTestableSupervisor(2, 1, true, "PT1H", new Period("PT1H"), null);
supervisorRecordSupplier.assign(anyObject());
expectLastCall().anyTimes();
@@ -454,7 +454,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
@Test
public void testEarlyMessageRejectionPeriod() throws Exception
{
- supervisor = getSupervisor(2, 1, true, "PT1H", null, new Period("PT1H"));
+ supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, new Period("PT1H"));
supervisorRecordSupplier.assign(anyObject());
expectLastCall().anyTimes();
@@ -508,7 +508,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
*/
public void testDatasourceMetadata() throws Exception
{
- supervisor = getSupervisor(1, 1, true, "PT1H", null, null);
+ supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);
supervisorRecordSupplier.assign(anyObject());
expectLastCall().anyTimes();
@@ -529,7 +529,11 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes();
EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(
new KinesisDataSourceMetadata(
- new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "2", shardId0, "1"), ImmutableSet.of())
+ new SeekableStreamStartSequenceNumbers<>(
+ stream,
+ ImmutableMap.of(shardId1, "2", shardId0, "1"),
+ ImmutableSet.of()
+ )
)
).anyTimes();
@@ -556,7 +560,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
@Test(expected = ISE.class)
public void testBadMetadataOffsets() throws Exception
{
- supervisor = getSupervisor(1, 1, true, "PT1H", null, null);
+ supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);
supervisorRecordSupplier.assign(anyObject());
expectLastCall().anyTimes();
@@ -575,7 +579,11 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes();
EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(
new KinesisDataSourceMetadata(
- new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId1, "101", shardId0, "-1"), ImmutableSet.of())
+ new SeekableStreamStartSequenceNumbers<>(
+ stream,
+ ImmutableMap.of(shardId1, "101", shardId0, "-1"),
+ ImmutableSet.of()
+ )
)
).anyTimes();
replayAll();
@@ -585,9 +593,9 @@ public class KinesisSupervisorTest extends EasyMockSupport
}
@Test
- public void testKillIncompatibleTasks() throws Exception
+ public void testDontKillTasksWithMismatchedDatasourceAndType() throws Exception
{
- supervisor = getSupervisor(2, 1, true, "PT1H", null, null);
+ supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, null);
supervisorRecordSupplier.assign(anyObject());
expectLastCall().anyTimes();
@@ -601,48 +609,9 @@ public class KinesisSupervisorTest extends EasyMockSupport
supervisorRecordSupplier.seek(anyObject(), anyString());
expectLastCall().anyTimes();
- // unexpected # of partitions (kill)
+ // different datasource (don't kill)
Task id1 = createKinesisIndexTask(
"id1",
- DATASOURCE,
- 1,
- new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId0, "0"), ImmutableSet.of()),
- new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId0, "1")),
- null,
- null
- );
-
- // correct number of partitions and ranges (don't kill)
- Task id2 = createKinesisIndexTask(
- "id2",
- DATASOURCE,
- 0,
- new SeekableStreamStartSequenceNumbers<>(
- stream,
- ImmutableMap.of(shardId0, "0", shardId1, "0"), ImmutableSet.of(shardId0, shardId1)
- ),
- new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId0, "1", shardId1, "12")),
- null,
- null
- );
-
- // unexpected range on partition 2 (kill)
- Task id3 = createKinesisIndexTask(
- "id3",
- DATASOURCE,
- 1,
- new SeekableStreamStartSequenceNumbers<>(
- stream,
- ImmutableMap.of(shardId0, "0", shardId1, "1"), ImmutableSet.of(shardId0, shardId1)
- ),
- new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId0, "1", shardId1, "11")),
- null,
- null
- );
-
- // different datasource (don't kill)
- Task id4 = createKinesisIndexTask(
- "id4",
"other-datasource",
2,
new SeekableStreamStartSequenceNumbers<>(
@@ -656,8 +625,8 @@ public class KinesisSupervisorTest extends EasyMockSupport
);
// non KinesisIndexTask (don't kill)
- Task id5 = new RealtimeIndexTask(
- "id5",
+ Task id2 = new RealtimeIndexTask(
+ "id2",
null,
new FireDepartment(
dataSchema,
@@ -667,18 +636,12 @@ public class KinesisSupervisorTest extends EasyMockSupport
null
);
- List<Task> existingTasks = ImmutableList.of(id1, id2, id3, id4, id5);
+ List<Task> existingTasks = ImmutableList.of(id1, id2);
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes();
EasyMock.expect(taskStorage.getActiveTasks()).andReturn(existingTasks).anyTimes();
- EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
- EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
- EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes();
- EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes();
- EasyMock.expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes();
- EasyMock.expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes();
EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString()))
.andReturn(Futures.immediateFuture(Status.NOT_STARTED))
.anyTimes();
@@ -690,24 +653,9 @@ public class KinesisSupervisorTest extends EasyMockSupport
null
)
).anyTimes();
- EasyMock.expect(taskClient.stopAsync("id1", false)).andReturn(Futures.immediateFuture(true));
- EasyMock.expect(taskClient.stopAsync("id3", false)).andReturn(Futures.immediateFuture(false));
taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class));
- taskQueue.shutdown("id3", "Task [%s] failed to stop in a timely manner, killing task", "id3");
-
- EasyMock.expect(taskQueue.add(EasyMock.anyObject(Task.class))).andReturn(true);
-
- TreeMap<Integer, Map<String, String>> checkpoints = new TreeMap<>();
- checkpoints.put(0, ImmutableMap.of(
- shardId0,
- "0",
- shardId1,
- "0"
- ));
- EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), EasyMock.anyBoolean()))
- .andReturn(Futures.immediateFuture(checkpoints))
- .times(2);
+ EasyMock.expect(taskQueue.add(EasyMock.anyObject(Task.class))).andReturn(true).times(2);
replayAll();
supervisor.start();
@@ -718,7 +666,14 @@ public class KinesisSupervisorTest extends EasyMockSupport
@Test
public void testKillBadPartitionAssignment() throws Exception
{
- supervisor = getSupervisor(1, 2, true, "PT1H", null, null);
+ supervisor = getTestableSupervisor(
+ 1,
+ 2,
+ true,
+ "PT1H",
+ null,
+ null
+ );
supervisorRecordSupplier.assign(anyObject());
expectLastCall().anyTimes();
@@ -771,17 +726,8 @@ public class KinesisSupervisorTest extends EasyMockSupport
null,
null
);
- Task id5 = createKinesisIndexTask(
- "id5",
- DATASOURCE,
- 0,
- new SeekableStreamStartSequenceNumbers<>(stream, ImmutableMap.of(shardId0, "0"), ImmutableSet.of(shardId0)),
- new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(shardId0, "1")),
- null,
- null
- );
- List<Task> existingTasks = ImmutableList.of(id1, id2, id3, id4, id5);
+ List<Task> existingTasks = ImmutableList.of(id1, id2, id3, id4);
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
@@ -791,12 +737,10 @@ public class KinesisSupervisorTest extends EasyMockSupport
EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes();
EasyMock.expect(taskStorage.getStatus("id4")).andReturn(Optional.of(TaskStatus.running("id4"))).anyTimes();
- EasyMock.expect(taskStorage.getStatus("id5")).andReturn(Optional.of(TaskStatus.running("id5"))).anyTimes();
EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes();
EasyMock.expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes();
EasyMock.expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes();
EasyMock.expect(taskStorage.getTask("id4")).andReturn(Optional.of(id4)).anyTimes();
- EasyMock.expect(taskStorage.getTask("id5")).andReturn(Optional.of(id5)).anyTimes();
EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString()))
.andReturn(Futures.immediateFuture(Status.NOT_STARTED))
.anyTimes();
@@ -809,24 +753,26 @@ public class KinesisSupervisorTest extends EasyMockSupport
)
).anyTimes();
EasyMock.expect(taskClient.stopAsync("id3", false)).andReturn(Futures.immediateFuture(true));
- EasyMock.expect(taskClient.stopAsync("id4", false)).andReturn(Futures.immediateFuture(false));
- EasyMock.expect(taskClient.stopAsync("id5", false)).andReturn(Futures.immediateFuture((Boolean) null));
TreeMap<Integer, Map<String, String>> checkpoints1 = new TreeMap<>();
checkpoints1.put(0, ImmutableMap.of(shardId1, "0"));
TreeMap<Integer, Map<String, String>> checkpoints2 = new TreeMap<>();
checkpoints2.put(0, ImmutableMap.of(shardId0, "0"));
+ TreeMap<Integer, Map<String, String>> checkpoints4 = new TreeMap<>();
+ checkpoints4.put(0, ImmutableMap.of(shardId0, "0"));
EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), EasyMock.anyBoolean()))
.andReturn(Futures.immediateFuture(checkpoints1))
.times(1);
EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), EasyMock.anyBoolean()))
.andReturn(Futures.immediateFuture(checkpoints2))
.times(1);
+ EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("id4"), EasyMock.anyBoolean()))
+ .andReturn(Futures.immediateFuture(checkpoints4))
+ .times(1);
taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class));
- taskQueue.shutdown("id4", "Task [%s] failed to stop in a timely manner, killing task", "id4");
- taskQueue.shutdown("id5", "Task [%s] failed to stop in a timely manner, killing task", "id5");
+ taskQueue.shutdown("id4", "Task [%s] failed to return status, killing task", "id4");
replayAll();
supervisor.start();
@@ -837,7 +783,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
@Test
public void testRequeueTaskWhenFailed() throws Exception
{
- supervisor = getSupervisor(2, 2, true, "PT1H", null, null);
+ supervisor = getTestableSupervisor(2, 2, true, "PT1H", null, null);
supervisorRecordSupplier.assign(anyObject());
expectLastCall().anyTimes();
expect(supervisorRecordSupplier.getPartitionIds(stream)).andReturn(ImmutableSet.of(shardId1, shardId0)).anyTimes();
@@ -946,7 +892,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
@Test
public void testRequeueAdoptedTaskWhenFailed() throws Exception
{
- supervisor = getSupervisor(2, 1, true, "PT1H", null, null);
+ supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, null);
supervisorRecordSupplier.assign(anyObject());
expectLastCall().anyTimes();
expect(supervisorRecordSupplier.getPartitionIds(stream)).andReturn(ImmutableSet.of(shardId1, shardId0)).anyTimes();
@@ -965,7 +911,11 @@ public class KinesisSupervisorTest extends EasyMockSupport
"id1",
DATASOURCE,
0,
- new SeekableStreamStartSequenceNumbers<>("stream", ImmutableMap.of(shardId1, "0", shardId0, "0"), ImmutableSet.of()),
+ new SeekableStreamStartSequenceNumbers<>(
+ "stream",
+ ImmutableMap.of(shardId1, "0", shardId0, "0"),
+ ImmutableSet.of()
+ ),
new SeekableStreamEndSequenceNumbers<>(
"stream",
ImmutableMap.of(
@@ -1076,7 +1026,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
@Test
public void testQueueNextTasksOnSuccess() throws Exception
{
- supervisor = getSupervisor(2, 2, true, "PT1H", null, null);
+ supervisor = getTestableSupervisor(2, 2, true, "PT1H", null, null);
supervisorRecordSupplier.assign(anyObject());
expectLastCall().anyTimes();
expect(supervisorRecordSupplier.getPartitionIds(stream)).andReturn(ImmutableSet.of(shardId1, shardId0)).anyTimes();
@@ -1197,7 +1147,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
{
final TaskLocation location = new TaskLocation("testHost", 1234, -1);
- supervisor = getSupervisor(2, 2, true, "PT1M", null, null);
+ supervisor = getTestableSupervisor(2, 2, true, "PT1M", null, null);
supervisorRecordSupplier.assign(anyObject());
expectLastCall().anyTimes();
@@ -1333,7 +1283,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
{
final TaskLocation location = new TaskLocation("testHost", 1234, -1);
- supervisor = getSupervisor(1, 1, true, "PT1H", null, null);
+ supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);
supervisorRecordSupplier.assign(anyObject());
expectLastCall().anyTimes();
@@ -1352,7 +1302,11 @@ public class KinesisSupervisorTest extends EasyMockSupport
"id1",
DATASOURCE,
0,
- new SeekableStreamStartSequenceNumbers<>("stream", ImmutableMap.of(shardId1, "0", shardId0, "0"), ImmutableSet.of()),
+ new SeekableStreamStartSequenceNumbers<>(
+ "stream",
+ ImmutableMap.of(shardId1, "0", shardId0, "0"),
+ ImmutableSet.of()
+ ),
new SeekableStreamEndSequenceNumbers<>(
"stream",
ImmutableMap.of(
@@ -1482,7 +1436,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
{
final TaskLocation location = new TaskLocation("testHost", 1234, -1);
- supervisor = getSupervisor(1, 1, true, "PT1H", null, null);
+ supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);
supervisorRecordSupplier.assign(anyObject());
expectLastCall().anyTimes();
expect(supervisorRecordSupplier.getPartitionIds(stream)).andReturn(ImmutableSet.of(shardId1, shardId0)).anyTimes();
@@ -1500,7 +1454,11 @@ public class KinesisSupervisorTest extends EasyMockSupport
"id1",
DATASOURCE,
0,
- new SeekableStreamStartSequenceNumbers<>("stream", ImmutableMap.of(shardId1, "0", shardId0, "0"), ImmutableSet.of()),
+ new SeekableStreamStartSequenceNumbers<>(
+ "stream",
+ ImmutableMap.of(shardId1, "0", shardId0, "0"),
+ ImmutableSet.of()
+ ),
new SeekableStreamEndSequenceNumbers<>(
"stream",
ImmutableMap.of(
@@ -1621,7 +1579,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
final TaskLocation location2 = new TaskLocation("testHost2", 145, -1);
final DateTime startTime = DateTimes.nowUtc();
- supervisor = getSupervisor(1, 1, true, "PT1H", null, null);
+ supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);
supervisorRecordSupplier.assign(anyObject());
expectLastCall().anyTimes();
@@ -1791,7 +1749,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
@Test
public void testKillUnresponsiveTasksWhileGettingStartTime() throws Exception
{
- supervisor = getSupervisor(2, 2, true, "PT1H", null, null);
+ supervisor = getTestableSupervisor(2, 2, true, "PT1H", null, null);
supervisorRecordSupplier.assign(anyObject());
expectLastCall().anyTimes();
expect(supervisorRecordSupplier.getPartitionIds(stream)).andReturn(ImmutableSet.of(shardId1, shardId0)).anyTimes();
@@ -1870,7 +1828,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
{
final TaskLocation location = new TaskLocation("testHost", 1234, -1);
- supervisor = getSupervisor(2, 2, true, "PT1M", null, null);
+ supervisor = getTestableSupervisor(2, 2, true, "PT1M", null, null);
supervisorRecordSupplier.assign(anyObject());
expectLastCall().anyTimes();
expect(supervisorRecordSupplier.getPartitionIds(stream)).andReturn(ImmutableSet.of(shardId1, shardId0)).anyTimes();
@@ -1975,7 +1933,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
{
final TaskLocation location = new TaskLocation("testHost", 1234, -1);
- supervisor = getSupervisor(2, 2, true, "PT1M", null, null);
+ supervisor = getTestableSupervisor(2, 2, true, "PT1M", null, null);
supervisorRecordSupplier.assign(anyObject());
expectLastCall().anyTimes();
expect(supervisorRecordSupplier.getPartitionIds(stream)).andReturn(ImmutableSet.of(shardId1, shardId0)).anyTimes();
@@ -2091,7 +2049,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
@Test(expected = IllegalStateException.class)
public void testStopNotStarted()
{
- supervisor = getSupervisor(1, 1, true, "PT1H", null, null);
+ supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);
supervisor.stop(false);
}
@@ -2106,7 +2064,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
taskRunner.unregisterListener(StringUtils.format("KinesisSupervisor-%s", DATASOURCE));
replayAll();
- supervisor = getSupervisor(1, 1, true, "PT1H", null, null);
+ supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);
supervisor.start();
supervisor.stop(false);
@@ -2120,7 +2078,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
final TaskLocation location2 = new TaskLocation("testHost2", 145, -1);
final DateTime startTime = DateTimes.nowUtc();
- supervisor = getSupervisor(2, 1, true, "PT1H", null, null);
+ supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, null);
supervisorRecordSupplier.assign(anyObject());
expectLastCall().anyTimes();
expect(supervisorRecordSupplier.getPartitionIds(stream)).andReturn(ImmutableSet.of(shardId1, shardId0)).anyTimes();
@@ -2138,7 +2096,11 @@ public class KinesisSupervisorTest extends EasyMockSupport
"id1",
DATASOURCE,
0,
- new SeekableStreamStartSequenceNumbers<>("stream", ImmutableMap.of(shardId1, "0", shardId0, "0"), ImmutableSet.of()),
+ new SeekableStreamStartSequenceNumbers<>(
+ "stream",
+ ImmutableMap.of(shardId1, "0", shardId0, "0"),
+ ImmutableSet.of()
+ ),
new SeekableStreamEndSequenceNumbers<>(
"stream",
ImmutableMap.of(
@@ -2296,7 +2258,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class));
replayAll();
- supervisor = getSupervisor(1, 1, true, "PT1H", null, null);
+ supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);
supervisor.start();
@@ -2316,7 +2278,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
public void testResetDataSourceMetadata() throws Exception
{
expect(supervisorRecordSupplier.getPartitionIds(anyObject())).andReturn(Collections.emptySet()).anyTimes();
- supervisor = getSupervisor(1, 1, true, "PT1H", null, null);
+ supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes();
@@ -2393,7 +2355,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
public void testResetNoDataSourceMetadata() throws Exception
{
expect(supervisorRecordSupplier.getPartitionIds(anyObject())).andReturn(Collections.emptySet()).anyTimes();
- supervisor = getSupervisor(1, 1, true, "PT1H", null, null);
+ supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes();
@@ -2429,7 +2391,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
final TaskLocation location2 = new TaskLocation("testHost2", 145, -1);
final DateTime startTime = DateTimes.nowUtc();
- supervisor = getSupervisor(2, 1, true, "PT1H", null, null);
+ supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, null);
supervisorRecordSupplier.assign(anyObject());
expectLastCall().anyTimes();
expect(supervisorRecordSupplier.getPartitionIds(stream)).andReturn(ImmutableSet.of(shardId1, shardId0)).anyTimes();
@@ -2447,7 +2409,11 @@ public class KinesisSupervisorTest extends EasyMockSupport
"id1",
DATASOURCE,
0,
- new SeekableStreamStartSequenceNumbers<>("stream", ImmutableMap.of(shardId1, "0", shardId0, "0"), ImmutableSet.of()),
+ new SeekableStreamStartSequenceNumbers<>(
+ "stream",
+ ImmutableMap.of(shardId1, "0", shardId0, "0"),
+ ImmutableSet.of()
+ ),
new SeekableStreamEndSequenceNumbers<>(
"stream",
ImmutableMap.of(
@@ -2580,7 +2546,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
public void testNoDataIngestionTasks() throws Exception
{
final DateTime startTime = DateTimes.nowUtc();
- supervisor = getSupervisor(2, 1, true, "PT1H", null, null);
+ supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, null);
//not adding any events
Task id1 = createKinesisIndexTask(
@@ -2725,7 +2691,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
public void testCheckpointForInactiveTaskGroup()
throws InterruptedException, ExecutionException, TimeoutException, JsonProcessingException
{
- supervisor = getSupervisor(2, 1, true, "PT1S", null, null, false);
+ supervisor = getTestableSupervisor(2, 1, true, "PT1S", null, null, false);
//not adding any events
final Task id1;
id1 = createKinesisIndexTask(
@@ -2863,7 +2829,11 @@ public class KinesisSupervisorTest extends EasyMockSupport
new KinesisDataSourceMetadata(
new SeekableStreamStartSequenceNumbers<>(stream, checkpoints.get(0), checkpoints.get(0).keySet())
),
- new KinesisDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(stream, fakeCheckpoints, ImmutableSet.of()))
+ new KinesisDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(
+ stream,
+ fakeCheckpoints,
+ ImmutableSet.of()
+ ))
);
while (supervisor.getNoticesQueueSize() > 0) {
@@ -2881,7 +2851,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
public void testCheckpointForUnknownTaskGroup()
throws InterruptedException
{
- supervisor = getSupervisor(2, 1, true, "PT1S", null, null, false);
+ supervisor = getTestableSupervisor(2, 1, true, "PT1S", null, null, false);
supervisorRecordSupplier.assign(anyObject());
@@ -2985,8 +2955,16 @@ public class KinesisSupervisorTest extends EasyMockSupport
supervisor.checkpoint(
0,
((KinesisIndexTask) id1).getIOConfig().getBaseSequenceName(),
- new KinesisDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(stream, Collections.emptyMap(), ImmutableSet.of())),
- new KinesisDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(stream, Collections.emptyMap(), ImmutableSet.of()))
+ new KinesisDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(
+ stream,
+ Collections.emptyMap(),
+ ImmutableSet.of()
+ )),
+ new KinesisDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(
+ stream,
+ Collections.emptyMap(),
+ ImmutableSet.of()
+ ))
);
while (supervisor.getNoticesQueueSize() > 0) {
@@ -3012,7 +2990,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
public void testCheckpointWithNullTaskGroupId()
throws InterruptedException, ExecutionException, TimeoutException, JsonProcessingException
{
- supervisor = getSupervisor(1, 3, true, "PT1S", null, null, false);
+ supervisor = getTestableSupervisor(1, 3, true, "PT1S", null, null, false);
//not adding any events
final Task id1 = createKinesisIndexTask(
"id1",
@@ -3101,8 +3079,16 @@ public class KinesisSupervisorTest extends EasyMockSupport
supervisor.checkpoint(
null,
((KinesisIndexTask) id1).getIOConfig().getBaseSequenceName(),
- new KinesisDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(stream, checkpoints.get(0), ImmutableSet.of())),
- new KinesisDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(stream, newCheckpoints.get(0), ImmutableSet.of()))
+ new KinesisDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(
+ stream,
+ checkpoints.get(0),
+ ImmutableSet.of()
+ )),
+ new KinesisDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(
+ stream,
+ newCheckpoints.get(0),
+ ImmutableSet.of()
+ ))
);
while (supervisor.getNoticesQueueSize() > 0) {
@@ -3116,7 +3102,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
@Test
public void testSuspendedNoRunningTasks() throws Exception
{
- supervisor = getSupervisor(1, 1, true, "PT1H", null, null, true);
+ supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null, true);
expect(supervisorRecordSupplier.getPartitionIds(anyObject())).andReturn(Collections.emptySet()).anyTimes();
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
@@ -3149,7 +3135,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
final TaskLocation location2 = new TaskLocation("testHost2", 145, -1);
final DateTime startTime = DateTimes.nowUtc();
- supervisor = getSupervisor(2, 1, true, "PT1H", null, null, true);
+ supervisor = getTestableSupervisor(2, 1, true, "PT1H", null, null, true);
supervisorRecordSupplier.assign(anyObject());
expectLastCall().anyTimes();
@@ -3169,7 +3155,11 @@ public class KinesisSupervisorTest extends EasyMockSupport
"id1",
DATASOURCE,
0,
- new SeekableStreamStartSequenceNumbers<>("stream", ImmutableMap.of(shardId1, "0", shardId0, "0"), ImmutableSet.of()),
+ new SeekableStreamStartSequenceNumbers<>(
+ "stream",
+ ImmutableMap.of(shardId1, "0", shardId0, "0"),
+ ImmutableSet.of()
+ ),
new SeekableStreamEndSequenceNumbers<>(
"stream",
ImmutableMap.of(
@@ -3314,7 +3304,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class));
replayAll();
- supervisor = getSupervisor(1, 1, true, "PT1H", null, null, true);
+ supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null, true);
supervisor.start();
supervisor.runInternal();
verifyAll();
@@ -3331,7 +3321,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
@Test
public void testGetCurrentTotalStats()
{
- supervisor = getSupervisor(1, 2, true, "PT1H", null, null, false);
+ supervisor = getTestableSupervisor(1, 2, true, "PT1H", null, null, false);
supervisor.addTaskGroupToActivelyReadingTaskGroup(
supervisor.getTaskGroupIdForPartition("0"),
ImmutableMap.of("0", "0"),
@@ -3372,81 +3362,393 @@ public class KinesisSupervisorTest extends EasyMockSupport
Assert.assertEquals(ImmutableMap.of("task2", ImmutableMap.of("prop2", "val2")), stats.get("1"));
}
- private KinesisSupervisor getSupervisor(
- int replicas,
- int taskCount,
- boolean useEarliestOffset,
- String duration,
- Period lateMessageRejectionPeriod,
- Period earlyMessageRejectionPeriod
- )
+ @Test
+ public void testDoNotKillCompatibleTasks()
+ throws InterruptedException, EntryExistsException, ExecutionException, TimeoutException, JsonProcessingException
{
- return getSupervisor(
- replicas,
- taskCount,
- useEarliestOffset,
- duration,
- lateMessageRejectionPeriod,
- earlyMessageRejectionPeriod,
+ // This supervisor always returns true for isTaskCurrent -> it should not kill its tasks
+ int numReplicas = 2;
+ supervisor = getTestableSupervisorCustomIsTaskCurrent(
+ numReplicas,
+ 1,
+ true,
+ "PT1H",
+ new Period("P1D"),
+ new Period("P1D"),
false,
+ 42,
+ 1000,
+ true
+ );
+
+ supervisorRecordSupplier.assign(EasyMock.anyObject());
+ EasyMock.expectLastCall().anyTimes();
+ EasyMock.expect(supervisorRecordSupplier.getPartitionIds(stream))
+ .andReturn(ImmutableSet.of(shardId1, shardId0)).anyTimes();
+ EasyMock.expect(supervisorRecordSupplier.getAssignment())
+ .andReturn(ImmutableSet.of(shard1Partition, shard0Partition))
+ .anyTimes();
+ supervisorRecordSupplier.seekToLatest(EasyMock.anyObject());
+ EasyMock.expectLastCall().anyTimes();
+ EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes();
+ EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes();
+ supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
+ EasyMock.expectLastCall().anyTimes();
+
+ Task task = createKinesisIndexTask(
+ "id2",
+ DATASOURCE,
+ 0,
+ new SeekableStreamStartSequenceNumbers<>(
+ stream,
+ ImmutableMap.of(
+ shardId0,
+ "0",
+ shardId1,
+ "0"
+ ),
+ ImmutableSet.of()
+ ),
+ new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(
+ shardId0,
+ "1",
+ shardId1,
+ "12"
+ )),
null,
null
);
- }
- private KinesisSupervisor getSupervisor(
- int replicas,
- int taskCount,
- boolean useEarliestOffset,
- String duration,
- Period lateMessageRejectionPeriod,
- Period earlyMessageRejectionPeriod,
- boolean suspended,
- Integer recordsPerFetch,
- Integer fetchDelayMillis
+ List<Task> existingTasks = ImmutableList.of(task);
- )
+ EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
+ EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
+ EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes();
+ EasyMock.expect(taskStorage.getActiveTasks()).andReturn(existingTasks).anyTimes();
+ EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
+ EasyMock.expect(taskStorage.getTask("id2")).andReturn(Optional.of(task)).anyTimes();
+ EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString()))
+ .andReturn(Futures.immediateFuture(Status.NOT_STARTED))
+ .anyTimes();
+ EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.anyString()))
+ .andReturn(Futures.immediateFuture(DateTimes.nowUtc()))
+ .anyTimes();
+ EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(
+ new KinesisDataSourceMetadata(
+ null
+ )
+ ).anyTimes();
+
+ taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class));
+ EasyMock.expect(taskQueue.add(EasyMock.anyObject(Task.class))).andReturn(true);
+
+ TreeMap<Integer, Map<String, String>> checkpoints = new TreeMap<>();
+ checkpoints.put(0, ImmutableMap.of(
+ shardId0,
+ "0",
+ shardId1,
+ "0"
+ ));
+
+ EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), EasyMock.anyBoolean()))
+ .andReturn(Futures.immediateFuture(checkpoints))
+ .times(numReplicas);
+
+ replayAll();
+ supervisor.start();
+ supervisor.runInternal();
+ verifyAll();
+ }
+
+ @Test
+ public void testKillIncompatibleTasks()
+ throws InterruptedException, ExecutionException, TimeoutException, JsonProcessingException, EntryExistsException
{
- KinesisSupervisorIOConfig KinesisSupervisorIOConfig = new KinesisSupervisorIOConfig(
- stream,
- "awsEndpoint",
- null,
- replicas,
- taskCount,
- new Period(duration),
+ // This supervisor always returns false for isTaskCurrent -> it should kill its tasks
+ int numReplicas = 2;
+ supervisor = getTestableSupervisorCustomIsTaskCurrent(
+ numReplicas,
+ 1,
+ true,
+ "PT1H",
new Period("P1D"),
- new Period("PT30S"),
- useEarliestOffset,
- new Period("PT30M"),
- lateMessageRejectionPeriod,
- earlyMessageRejectionPeriod,
- recordsPerFetch,
- fetchDelayMillis,
- null,
- null,
+ new Period("P1D"),
+ false,
+ 42,
+ 1000,
false
);
+ supervisorRecordSupplier.assign(EasyMock.anyObject());
+ EasyMock.expectLastCall().anyTimes();
+ EasyMock.expect(supervisorRecordSupplier.getPartitionIds(stream))
+ .andReturn(ImmutableSet.of(shardId1, shardId0)).anyTimes();
+ EasyMock.expect(supervisorRecordSupplier.getAssignment())
+ .andReturn(ImmutableSet.of(shard1Partition, shard0Partition))
+ .anyTimes();
+ supervisorRecordSupplier.seekToLatest(EasyMock.anyObject());
+ EasyMock.expectLastCall().anyTimes();
+ EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes();
+ EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes();
+ supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
+ EasyMock.expectLastCall().anyTimes();
- KinesisIndexTaskClientFactory taskClientFactory = new KinesisIndexTaskClientFactory(
+ Task task = createKinesisIndexTask(
+ "id1",
+ DATASOURCE,
+ 0,
+ new SeekableStreamStartSequenceNumbers<>(
+ stream,
+ ImmutableMap.of(
+ shardId0,
+ "0",
+ shardId1,
+ "0"
+ ),
+ ImmutableSet.of()
+ ),
+ new SeekableStreamEndSequenceNumbers<>(stream, ImmutableMap.of(
+ shardId0,
+ "1",
+ shardId1,
+ "12"
+ )),
null,
null
- )
- {
- @Override
- public KinesisIndexTaskClient build(
- TaskInfoProvider taskInfoProvider,
- String dataSource,
- int numThreads,
- Duration httpTimeout,
- long numRetries
- )
- {
- Assert.assertEquals(TEST_CHAT_THREADS, numThreads);
- Assert.assertEquals(TEST_HTTP_TIMEOUT.toStandardDuration(), httpTimeout);
- Assert.assertEquals(TEST_CHAT_RETRIES, numRetries);
- return taskClient;
- }
- };
+ );
+
+ List<Task> existingTasks = ImmutableList.of(task);
+
+ EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
+ EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
+ EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes();
+ EasyMock.expect(taskStorage.getActiveTasks()).andReturn(existingTasks).anyTimes();
+ EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
+ EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(task)).anyTimes();
+ EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString()))
+ .andReturn(Futures.immediateFuture(Status.NOT_STARTED))
+ .anyTimes();
+ EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.anyString()))
+ .andReturn(Futures.immediateFuture(DateTimes.nowUtc()))
+ .anyTimes();
+ EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(
+ new KinesisDataSourceMetadata(
+ null
+ )
+ ).anyTimes();
+ EasyMock.expect(taskClient.stopAsync("id1", false)).andReturn(Futures.immediateFuture(true));
+ taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class));
+ EasyMock.expect(taskQueue.add(EasyMock.anyObject(Task.class))).andReturn(true).times(2);
+
+ replayAll();
+ supervisor.start();
+ supervisor.runInternal();
+ verifyAll();
+ }
+
+ @Test
+ public void testIsTaskCurrent()
+ {
+ DateTime minMessageTime = DateTimes.nowUtc();
+ DateTime maxMessageTime = DateTimes.nowUtc().plus(10000);
+
+ KinesisSupervisor supervisor = getSupervisor(
+ 1,
+ 1,
+ true,
+ "PT1H",
+ new Period("P1D"),
+ new Period("P1D"),
+ false,
+ 42,
+ 42,
+ dataSchema,
+ tuningConfig
+ );
+
+ supervisor.addTaskGroupToActivelyReadingTaskGroup(
+ 42,
+ ImmutableMap.of(shardId1, "3"),
+ Optional.of(minMessageTime),
+ Optional.of(maxMessageTime),
+ ImmutableSet.of("id1", "id2", "id3", "id4"),
+ ImmutableSet.of()
+ );
+
+ DataSchema modifiedDataSchema = getDataSchema("some other datasource");
+
+ KinesisSupervisorTuningConfig modifiedTuningConfig = new KinesisSupervisorTuningConfig(
+ 1000,
+ null,
+ 50000,
+ null,
+ new Period("P1Y"),
+ new File("/test"),
+ null,
+ null,
+ true,
+ false,
+ null,
+ null,
+ null,
+ null,
+ numThreads,
+ TEST_CHAT_THREADS,
+ TEST_CHAT_RETRIES,
+ TEST_HTTP_TIMEOUT,
+ TEST_SHUTDOWN_TIMEOUT,
+ null,
+ null,
+ null,
+ 5000,
+ null,
+ null,
+ null,
+ null,
+ 42, // This property is different from tuningConfig
+ null
+ );
+
+ KinesisIndexTask taskFromStorage = createKinesisIndexTask(
+ "id1",
+ 0,
+ new SeekableStreamStartSequenceNumbers<>("stream", ImmutableMap.of(
+ shardId1,
+ "3"
+ ), ImmutableSet.of()),
+ new SeekableStreamEndSequenceNumbers<>("stream", ImmutableMap.of(
+ shardId1,
+ KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER
+ )),
+ minMessageTime,
+ maxMessageTime,
+ dataSchema
+ );
+
+ KinesisIndexTask taskFromStorageMismatchedDataSchema = createKinesisIndexTask(
+ "id2",
+ 0,
+ new SeekableStreamStartSequenceNumbers<>("stream", ImmutableMap.of(
+ shardId1,
+ "3"
+ ), ImmutableSet.of()),
+ new SeekableStreamEndSequenceNumbers<>("stream", ImmutableMap.of(
+ shardId1,
+ KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER
+ )),
+ minMessageTime,
+ maxMessageTime,
+ modifiedDataSchema
+ );
+
+ KinesisIndexTask taskFromStorageMismatchedTuningConfig = createKinesisIndexTask(
+ "id3",
+ 0,
+ new SeekableStreamStartSequenceNumbers<>("stream", ImmutableMap.of(
+ shardId1,
+ "3"
+ ), ImmutableSet.of()),
+ new SeekableStreamEndSequenceNumbers<>("stream", ImmutableMap.of(
+ shardId1,
+ KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER
+ )),
+ minMessageTime,
+ maxMessageTime,
+ dataSchema,
+ modifiedTuningConfig
+ );
+
+ KinesisIndexTask taskFromStorageMismatchedPartitionsWithTaskGroup = createKinesisIndexTask(
+ "id4",
+ 0,
+ new SeekableStreamStartSequenceNumbers<>(
+ "stream",
+ ImmutableMap.of(
+ shardId1,
+ "4" // this is the mismatch
+ ),
+ ImmutableSet.of()
+ ),
+ new SeekableStreamEndSequenceNumbers<>("stream", ImmutableMap.of(
+ shardId1,
+ KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER
+ )),
+ minMessageTime,
+ maxMessageTime,
+ dataSchema
+ );
+
+ EasyMock.expect(taskStorage.getTask("id1"))
+ .andReturn(Optional.of(taskFromStorage))
+ .once();
+ EasyMock.expect(taskStorage.getTask("id2"))
+ .andReturn(Optional.of(taskFromStorageMismatchedDataSchema))
+ .once();
+ EasyMock.expect(taskStorage.getTask("id3"))
+ .andReturn(Optional.of(taskFromStorageMismatchedTuningConfig))
+ .once();
+ EasyMock.expect(taskStorage.getTask("id4"))
+ .andReturn(Optional.of(taskFromStorageMismatchedPartitionsWithTaskGroup))
+ .once();
+
+ replayAll();
+
+ Assert.assertTrue(supervisor.isTaskCurrent(42, "id1"));
+ Assert.assertFalse(supervisor.isTaskCurrent(42, "id2"));
+ Assert.assertFalse(supervisor.isTaskCurrent(42, "id3"));
+ Assert.assertFalse(supervisor.isTaskCurrent(42, "id4"));
+ verifyAll();
+ }
+
+ private KinesisSupervisor getTestableSupervisor(
+ int replicas,
+ int taskCount,
+ boolean useEarliestOffset,
+ String duration,
+ Period lateMessageRejectionPeriod,
+ Period earlyMessageRejectionPeriod,
+ boolean suspended
+ )
+ {
+ KinesisSupervisorIOConfig KinesisSupervisorIOConfig = new KinesisSupervisorIOConfig(
+ stream,
+ "awsEndpoint",
+ null,
+ replicas,
+ taskCount,
+ new Period(duration),
+ new Period("P1D"),
+ new Period("PT30S"),
+ useEarliestOffset,
+ new Period("PT30M"),
+ lateMessageRejectionPeriod,
+ earlyMessageRejectionPeriod,
+ null,
+ null,
+ null,
+ null,
+ false
+ );
+
+ KinesisIndexTaskClientFactory taskClientFactory = new KinesisIndexTaskClientFactory(
+ null,
+ null
+ )
+ {
+ @Override
+ public KinesisIndexTaskClient build(
+ TaskInfoProvider taskInfoProvider,
+ String dataSource,
+ int numThreads,
+ Duration httpTimeout,
+ long numRetries
+ )
+ {
+ Assert.assertEquals(TEST_CHAT_THREADS, numThreads);
+ Assert.assertEquals(TEST_HTTP_TIMEOUT.toStandardDuration(), httpTimeout);
+ Assert.assertEquals(TEST_CHAT_RETRIES, numRetries);
+ return taskClient;
+ }
+ };
return new TestableKinesisSupervisor(
taskStorage,
@@ -3474,105 +3776,194 @@ public class KinesisSupervisorTest extends EasyMockSupport
);
}
- private static DataSchema getDataSchema(String dataSource)
+ private KinesisSupervisor getTestableSupervisor(
+ int replicas,
+ int taskCount,
+ boolean useEarliestOffset,
+ String duration,
+ Period lateMessageRejectionPeriod,
+ Period earlyMessageRejectionPeriod
+ )
{
- List<DimensionSchema> dimensions = new ArrayList<>();
- dimensions.add(StringDimensionSchema.create("dim1"));
- dimensions.add(StringDimensionSchema.create("dim2"));
-
- return new DataSchema(
- dataSource,
- objectMapper.convertValue(
- new StringInputRowParser(
- new JSONParseSpec(
- new TimestampSpec("timestamp", "iso", null),
- new DimensionsSpec(
- dimensions,
- null,
- null
- ),
- new JSONPathSpec(true, ImmutableList.of()),
- ImmutableMap.of()
- ),
- StandardCharsets.UTF_8.name()
- ),
- Map.class
- ),
- new AggregatorFactory[]{new CountAggregatorFactory("rows")},
- new UniformGranularitySpec(
- Granularities.HOUR,
- Granularities.NONE,
- ImmutableList.of()
- ),
+ return getTestableSupervisor(
+ replicas,
+ taskCount,
+ useEarliestOffset,
+ duration,
+ lateMessageRejectionPeriod,
+ earlyMessageRejectionPeriod,
+ false,
null,
- objectMapper
+ null
);
}
- private static List<byte[]> JB(
- String timestamp,
- String dim1,
- String dim2,
- String dimLong,
- String dimFloat,
- String met1
+ private KinesisSupervisor getTestableSupervisor(
+ int replicas,
+ int taskCount,
+ boolean useEarliestOffset,
+ String duration,
+ Period lateMessageRejectionPeriod,
+ Period earlyMessageRejectionPeriod,
+ boolean suspended,
+ Integer recordsPerFetch,
+ Integer fetchDelayMillis
)
{
- try {
- return Collections.singletonList(new ObjectMapper().writeValueAsBytes(
- ImmutableMap.builder()
- .put("timestamp", timestamp)
- .put("dim1", dim1)
- .put("dim2", dim2)
- .put("dimLong", dimLong)
- .put("dimFloat", dimFloat)
- .put("met1", met1)
- .build()
- ));
- }
- catch (Exception e) {
- throw Throwables.propagate(e);
- }
+ KinesisSupervisorIOConfig KinesisSupervisorIOConfig = new KinesisSupervisorIOConfig(
+ stream,
+ "awsEndpoint",
+ null,
+ replicas,
+ taskCount,
+ new Period(duration),
+ new Period("P1D"),
+ new Period("PT30S"),
+ useEarliestOffset,
+ new Period("PT30M"),
+ lateMessageRejectionPeriod,
+ earlyMessageRejectionPeriod,
+ recordsPerFetch,
+ fetchDelayMillis,
+ null,
+ null,
+ false
+ );
+
+ KinesisIndexTaskClientFactory taskClientFactory = new KinesisIndexTaskClientFactory(
+ null,
+ null
+ )
+ {
+ @Override
+ public KinesisIndexTaskClient build(
+ TaskInfoProvider taskInfoProvider,
+ String dataSource,
+ int numThreads,
+ Duration httpTimeout,
+ long numRetries
+ )
+ {
+ Assert.assertEquals(TEST_CHAT_THREADS, numThreads);
+ Assert.assertEquals(TEST_HTTP_TIMEOUT.toStandardDuration(), httpTimeout);
+ Assert.assertEquals(TEST_CHAT_RETRIES, numRetries);
+ return taskClient;
+ }
+ };
+
+ return new TestableKinesisSupervisor(
+ taskStorage,
+ taskMaster,
+ indexerMetadataStorageCoordinator,
+ taskClientFactory,
+ objectMapper,
+ new KinesisSupervisorSpec(
+ dataSchema,
+ tuningConfig,
+ KinesisSupervisorIOConfig,
+ null,
+ suspended,
+ taskStorage,
+ taskMaster,
+ indexerMetadataStorageCoordinator,
+ taskClientFactory,
+ objectMapper,
+ new NoopServiceEmitter(),
+ new DruidMonitorSchedulerConfig(),
+ rowIngestionMetersFactory,
+ null
+ ),
+ rowIngestionMetersFactory
+ );
}
- private KinesisIndexTask createKinesisIndexTask(
- String id,
- String dataSource,
- int taskGroupId,
- SeekableStreamStartSequenceNumbers<String, String> startPartitions,
- SeekableStreamEndSequenceNumbers<String, String> endPartitions,
- DateTime minimumMessageTime,
- DateTime maximumMessageTime
+ /**
+ * Use when you want to mock the return value of SeekableStreamSupervisor#isTaskCurrent()
+ */
+ private KinesisSupervisor getTestableSupervisorCustomIsTaskCurrent(
+ int replicas,
+ int taskCount,
+ boolean useEarliestOffset,
+ String duration,
+ Period lateMessageRejectionPeriod,
+ Period earlyMessageRejectionPeriod,
+ boolean suspended,
+ Integer recordsPerFetch,
+ Integer fetchDelayMillis,
+ boolean isTaskCurrentReturn
)
{
- return new KinesisIndexTask(
- id,
+ KinesisSupervisorIOConfig KinesisSupervisorIOConfig = new KinesisSupervisorIOConfig(
+ stream,
+ "awsEndpoint",
null,
- getDataSchema(dataSource),
- tuningConfig,
- new KinesisIndexTaskIOConfig(
- null,
- "sequenceName-" + taskGroupId,
- startPartitions,
- endPartitions,
- true,
- minimumMessageTime,
- maximumMessageTime,
- "awsEndpoint",
- null,
- null,
- null,
- null,
- false
- ),
- Collections.emptyMap(),
+ replicas,
+ taskCount,
+ new Period(duration),
+ new Period("P1D"),
+ new Period("PT30S"),
+ useEarliestOffset,
+ new Period("PT30M"),
+ lateMessageRejectionPeriod,
+ earlyMessageRejectionPeriod,
+ recordsPerFetch,
+ fetchDelayMillis,
null,
null,
- rowIngestionMetersFactory,
+ false
+ );
+
+ KinesisIndexTaskClientFactory taskClientFactory = new KinesisIndexTaskClientFactory(
+ null,
null
+ )
+ {
+ @Override
+ public KinesisIndexTaskClient build(
+ TaskInfoProvider taskInfoProvider,
+ String dataSource,
+ int numThreads,
+ Duration httpTimeout,
+ long numRetries
+ )
+ {
+ Assert.assertEquals(TEST_CHAT_THREADS, numThreads);
+ Assert.assertEquals(TEST_HTTP_TIMEOUT.toStandardDuration(), httpTimeout);
+ Assert.assertEquals(TEST_CHAT_RETRIES, numRetries);
+ return taskClient;
+ }
+ };
+
+ return new TestableKinesisSupervisorWithCustomIsTaskCurrent(
+ taskStorage,
+ taskMaster,
+ indexerMetadataStorageCoordinator,
+ taskClientFactory,
+ objectMapper,
+ new KinesisSupervisorSpec(
+ dataSchema,
+ tuningConfig,
+ KinesisSupervisorIOConfig,
+ null,
+ suspended,
+ taskStorage,
+ taskMaster,
+ indexerMetadataStorageCoordinator,
+ taskClientFactory,
+ objectMapper,
+ new NoopServiceEmitter(),
+ new DruidMonitorSchedulerConfig(),
+ rowIngestionMetersFactory,
+ null
+ ),
+ rowIngestionMetersFactory,
+ isTaskCurrentReturn
);
}
+ /**
+ * Use for tests where you don't want generateSequenceName to be overridden out
+ */
private KinesisSupervisor getSupervisor(
int replicas,
int taskCount,
@@ -3580,7 +3971,11 @@ public class KinesisSupervisorTest extends EasyMockSupport
String duration,
Period lateMessageRejectionPeriod,
Period earlyMessageRejectionPeriod,
- boolean suspended
+ boolean suspended,
+ Integer recordsPerFetch,
+ Integer fetchDelayMillis,
+ DataSchema dataSchema,
+ KinesisSupervisorTuningConfig tuningConfig
)
{
KinesisSupervisorIOConfig KinesisSupervisorIOConfig = new KinesisSupervisorIOConfig(
@@ -3596,8 +3991,8 @@ public class KinesisSupervisorTest extends EasyMockSupport
new Period("PT30M"),
lateMessageRejectionPeriod,
earlyMessageRejectionPeriod,
- null,
- null,
+ recordsPerFetch,
+ fetchDelayMillis,
null,
null,
false
@@ -3624,7 +4019,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
}
};
- return new TestableKinesisSupervisor(
+ return new KinesisSupervisor(
taskStorage,
taskMaster,
indexerMetadataStorageCoordinator,
@@ -3646,7 +4041,126 @@ public class KinesisSupervisorTest extends EasyMockSupport
rowIngestionMetersFactory,
null
),
- rowIngestionMetersFactory
+ rowIngestionMetersFactory,
+ null
+ );
+ }
+
+ private static DataSchema getDataSchema(String dataSource)
+ {
+ List<DimensionSchema> dimensions = new ArrayList<>();
+ dimensions.add(StringDimensionSchema.create("dim1"));
+ dimensions.add(StringDimensionSchema.create("dim2"));
+
+ return new DataSchema(
+ dataSource,
+ objectMapper.convertValue(
+ new StringInputRowParser(
+ new JSONParseSpec(
+ new TimestampSpec("timestamp", "iso", null),
+ new DimensionsSpec(
+ dimensions,
+ null,
+ null
+ ),
+ new JSONPathSpec(true, ImmutableList.of()),
+ ImmutableMap.of()
+ ),
+ StandardCharsets.UTF_8.name()
+ ),
+ Map.class
+ ),
+ new AggregatorFactory[]{new CountAggregatorFactory("rows")},
+ new UniformGranularitySpec(
+ Granularities.HOUR,
+ Granularities.NONE,
+ ImmutableList.of()
+ ),
+ null,
+ objectMapper
+ );
+ }
+
+
+ private KinesisIndexTask createKinesisIndexTask(
+ String id,
+ String dataSource,
+ int taskGroupId,
+ SeekableStreamStartSequenceNumbers<String, String> startPartitions,
+ SeekableStreamEndSequenceNumbers<String, String> endPartitions,
+ DateTime minimumMessageTime,
+ DateTime maximumMessageTime
+ )
+ {
+ return createKinesisIndexTask(
+ id,
+ taskGroupId,
+ startPartitions,
+ endPartitions,
+ minimumMessageTime,
+ maximumMessageTime,
+ getDataSchema(dataSource)
+ );
+ }
+
+ private KinesisIndexTask createKinesisIndexTask(
+ String id,
+ int taskGroupId,
+ SeekableStreamStartSequenceNumbers<String, String> startPartitions,
+ SeekableStreamEndSequenceNumbers<String, String> endPartitions,
+ DateTime minimumMessageTime,
+ DateTime maximumMessageTime,
+ DataSchema dataSchema
+ )
+ {
+ return createKinesisIndexTask(
+ id,
+ taskGroupId,
+ startPartitions,
+ endPartitions,
+ minimumMessageTime,
+ maximumMessageTime,
+ dataSchema,
+ (KinesisIndexTaskTuningConfig) tuningConfig.convertToTaskTuningConfig()
+ );
+ }
+
+ private KinesisIndexTask createKinesisIndexTask(
+ String id,
+ int taskGroupId,
+ SeekableStreamStartSequenceNumbers<String, String> startPartitions,
+ SeekableStreamEndSequenceNumbers<String, String> endPartitions,
+ DateTime minimumMessageTime,
+ DateTime maximumMessageTime,
+ DataSchema dataSchema,
+ KinesisIndexTaskTuningConfig tuningConfig
+ )
+ {
+ return new KinesisIndexTask(
+ id,
+ null,
+ dataSchema,
+ tuningConfig,
+ new KinesisIndexTaskIOConfig(
+ 0,
+ "sequenceName-" + taskGroupId,
+ startPartitions,
+ endPartitions,
+ true,
+ minimumMessageTime,
+ maximumMessageTime,
+ "awsEndpoint",
+ null,
+ null,
+ null,
+ null,
+ false
+ ),
+ Collections.emptyMap(),
+ null,
+ null,
+ rowIngestionMetersFactory,
+ null
);
}
@@ -3715,7 +4229,9 @@ public class KinesisSupervisorTest extends EasyMockSupport
protected String generateSequenceName(
Map<String, String> startPartitions,
Optional<DateTime> minimumMessageTime,
- Optional<DateTime> maximumMessageTime
+ Optional<DateTime> maximumMessageTime,
+ DataSchema dataSchema,
+ SeekableStreamIndexTaskTuningConfig tuningConfig
)
{
final int groupId = getTaskGroupIdForPartition(startPartitions.keySet().iterator().next());
@@ -3727,7 +4243,39 @@ public class KinesisSupervisorTest extends EasyMockSupport
{
return supervisorRecordSupplier;
}
+ }
+ private class TestableKinesisSupervisorWithCustomIsTaskCurrent extends TestableKinesisSupervisor
+ {
+ private boolean isTaskCurrentReturn;
+ public TestableKinesisSupervisorWithCustomIsTaskCurrent(
+ TaskStorage taskStorage,
+ TaskMaster taskMaster,
+ IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator,
+ KinesisIndexTaskClientFactory taskClientFactory,
+ ObjectMapper mapper,
+ KinesisSupervisorSpec spec,
+ RowIngestionMetersFactory rowIngestionMetersFactory,
+ boolean isTaskCurrentReturn
+ )
+ {
+ super(
+ taskStorage,
+ taskMaster,
+ indexerMetadataStorageCoordinator,
+ taskClientFactory,
+ mapper,
+ spec,
+ rowIngestionMetersFactory
+ );
+ this.isTaskCurrentReturn = isTaskCurrentReturn;
+ }
+
+ @Override
+ public boolean isTaskCurrent(int taskGroupId, String taskId)
+ {
+ return isTaskCurrentReturn;
+ }
}
}
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java
new file mode 100644
index 0000000..2485e97
--- /dev/null
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.kinesis.test;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.druid.indexing.kinesis.KinesisIndexTaskTuningConfig;
+import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
+import org.joda.time.Period;
+
+import javax.annotation.Nullable;
+import java.io.File;
+
+@JsonTypeName("KinesisTuningConfig")
+public class TestModifiedKinesisIndexTaskTuningConfig extends KinesisIndexTaskTuningConfig
+{
+ private final String extra;
+
+ @JsonCreator
+ public TestModifiedKinesisIndexTaskTuningConfig(
+ @JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
+ @JsonProperty("maxBytesInMemory") Long maxBytesInMemory,
+ @JsonProperty("maxRowsPerSegment") Integer maxRowsPerSegment,
+ @JsonProperty("maxTotalRows") Long maxTotalRows,
+ @JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod,
+ @JsonProperty("basePersistDirectory") File basePersistDirectory,
+ @JsonProperty("maxPendingPersists") Integer maxPendingPersists,
+ @JsonProperty("indexSpec") IndexSpec indexSpec,
+ @JsonProperty("buildV9Directly") Boolean buildV9Directly,
+ @JsonProperty("reportParseExceptions") Boolean reportParseExceptions,
+ @JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout,
+ @JsonProperty("resetOffsetAutomatically") Boolean resetOffsetAutomatically,
+ @JsonProperty("skipSequenceNumberAvailabilityCheck") Boolean skipSequenceNumberAvailabilityCheck,
+ @JsonProperty("recordBufferSize") Integer recordBufferSize,
+ @JsonProperty("recordBufferOfferTimeout") Integer recordBufferOfferTimeout,
+ @JsonProperty("recordBufferFullWait") Integer recordBufferFullWait,
+ @JsonProperty("fetchSequenceNumberTimeout") Integer fetchSequenceNumberTimeout,
+ @JsonProperty("fetchThreads") Integer fetchThreads,
+ @JsonProperty("segmentWriteOutMediumFactory") @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
+ @JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions,
+ @JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions,
+ @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions,
+ @JsonProperty("maxRecordsPerPoll") @Nullable Integer maxRecordsPerPoll,
+ @JsonProperty("intermediateHandoffPeriod") @Nullable Period intermediateHandoffPeriod,
+ @JsonProperty("extra") String extra
+ )
+ {
+ super(
+ maxRowsInMemory,
+ maxBytesInMemory,
+ maxRowsPerSegment,
+ maxTotalRows,
+ intermediatePersistPeriod,
+ basePersistDirectory,
+ maxPendingPersists,
+ indexSpec,
+ buildV9Directly,
+ reportParseExceptions,
+ handoffConditionTimeout,
+ resetOffsetAutomatically,
+ skipSequenceNumberAvailabilityCheck,
+ recordBufferSize,
+ recordBufferOfferTimeout,
+ recordBufferFullWait,
+ fetchSequenceNumberTimeout,
+ fetchThreads,
+ segmentWriteOutMediumFactory,
+ logParseExceptions,
+ maxParseExceptions,
+ maxSavedParseExceptions,
+ maxRecordsPerPoll,
+ intermediateHandoffPeriod
+ );
+ this.extra = extra;
+ }
+
+ public TestModifiedKinesisIndexTaskTuningConfig(KinesisIndexTaskTuningConfig base, String extra)
+ {
+ super(
+ base.getMaxRowsInMemory(),
+ base.getMaxBytesInMemory(),
+ base.getMaxRowsPerSegment(),
+ base.getMaxTotalRows(),
+ base.getIntermediatePersistPeriod(),
+ base.getBasePersistDirectory(),
+ base.getMaxPendingPersists(),
+ base.getIndexSpec(),
+ base.getBuildV9Directly(),
+ base.isReportParseExceptions(),
+ base.getHandoffConditionTimeout(),
+ base.isResetOffsetAutomatically(),
+ base.isSkipSequenceNumberAvailabilityCheck(),
+ base.getRecordBufferSize(),
+ base.getRecordBufferOfferTimeout(),
+ base.getRecordBufferFullWait(),
+ base.getFetchSequenceNumberTimeout(),
+ base.getFetchThreads(),
+ base.getSegmentWriteOutMediumFactory(),
+ base.isLogParseExceptions(),
+ base.getMaxParseExceptions(),
+ base.getMaxSavedParseExceptions(),
+ base.getMaxRecordsPerPoll(),
+ base.getIntermediateHandoffPeriod()
+ );
+ this.extra = extra;
+ }
+
+ @JsonProperty("extra")
+ public String getExtra()
+ {
+ return extra;
+ }
+}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index f904e85..10f97ea 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -74,6 +74,7 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.metadata.EntryExistsException;
+import org.apache.druid.segment.indexing.DataSchema;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
@@ -168,6 +169,31 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
@Nullable Set<PartitionIdType> exclusiveStartSequenceNumberPartitions
)
{
+ this(
+ groupId,
+ startingSequences,
+ minimumMessageTime,
+ maximumMessageTime,
+ exclusiveStartSequenceNumberPartitions,
+ generateSequenceName(
+ startingSequences,
+ minimumMessageTime,
+ maximumMessageTime,
+ spec.getDataSchema(),
+ taskTuningConfig
+ )
+ );
+ }
+
+ TaskGroup(
+ int groupId,
+ ImmutableMap<PartitionIdType, SequenceOffsetType> startingSequences,
+ Optional<DateTime> minimumMessageTime,
+ Optional<DateTime> maximumMessageTime,
+ Set<PartitionIdType> exclusiveStartSequenceNumberPartitions,
+ String baseSequenceName
+ )
+ {
this.groupId = groupId;
this.startingSequences = startingSequences;
this.minimumMessageTime = minimumMessageTime;
@@ -176,7 +202,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
this.exclusiveStartSequenceNumberPartitions = exclusiveStartSequenceNumberPartitions != null
? exclusiveStartSequenceNumberPartitions
: Collections.emptySet();
- this.baseSequenceName = generateSequenceName(startingSequences, minimumMessageTime, maximumMessageTime);
+ this.baseSequenceName = baseSequenceName;
}
int addNewCheckpoint(Map<PartitionIdType, SequenceOffsetType> checkpoint)
@@ -1154,12 +1180,18 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
}
if (metadataUpdateSuccess) {
- resetMetadata.getSeekableStreamSequenceNumbers().getPartitionSequenceNumberMap().keySet().forEach(partition -> {
- final int groupId = getTaskGroupIdForPartition(partition);
- killTaskGroupForPartitions(ImmutableSet.of(partition), "DataSourceMetadata is updated while reset");
- activelyReadingTaskGroups.remove(groupId);
- partitionGroups.get(groupId).replaceAll((partitionId, sequence) -> getNotSetMarker());
- });
+ resetMetadata.getSeekableStreamSequenceNumbers()
+ .getPartitionSequenceNumberMap()
+ .keySet()
+ .forEach(partition -> {
+ final int groupId = getTaskGroupIdForPartition(partition);
+ killTaskGroupForPartitions(
+ ImmutableSet.of(partition),
+ "DataSourceMetadata is updated while reset"
+ );
+ activelyReadingTaskGroups.remove(groupId);
+ partitionGroups.get(groupId).replaceAll((partitionId, sequence) -> getNotSetMarker());
+ });
} else {
throw new ISE("Unable to reset metadata");
}
@@ -1257,7 +1289,8 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
futureTaskIds.add(taskId);
futures.add(
Futures.transform(
- taskClient.getStatusAsync(taskId), new Function<SeekableStreamIndexTaskRunner.Status, Boolean>()
+ taskClient.getStatusAsync(taskId),
+ new Function<SeekableStreamIndexTaskRunner.Status, Boolean>()
{
@Override
public Boolean apply(SeekableStreamIndexTaskRunner.Status status)
@@ -1271,7 +1304,8 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
.keySet()
.forEach(
partition -> addDiscoveredTaskToPendingCompletionTaskGroups(
- getTaskGroupIdForPartition(partition),
+ getTaskGroupIdForPartition(
+ partition),
taskId,
seekableStreamIndexTask.getIOConfig()
.getStartSequenceNumbers()
@@ -1295,7 +1329,9 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
succeeded = true;
SequenceOffsetType previousOffset = partitionOffsets.putIfAbsent(partition, sequence);
if (previousOffset != null
- && (makeSequenceNumber(previousOffset).compareTo(makeSequenceNumber(sequence))) < 0) {
+ && (makeSequenceNumber(previousOffset)
+ .compareTo(makeSequenceNumber(
+ sequence))) < 0) {
succeeded = partitionOffsets.replace(partition, previousOffset, sequence);
}
} while (!succeeded);
@@ -1311,7 +1347,8 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
taskId
);
try {
- stopTask(taskId, false).get(futureTimeoutInSeconds, TimeUnit.SECONDS);
+ stopTask(taskId, false)
+ .get(futureTimeoutInSeconds, TimeUnit.SECONDS);
}
catch (InterruptedException | ExecutionException | TimeoutException e) {
log.warn(e, "Exception while stopping task");
@@ -1327,7 +1364,8 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
taskId
);
try {
- stopTask(taskId, false).get(futureTimeoutInSeconds, TimeUnit.SECONDS);
+ stopTask(taskId, false)
+ .get(futureTimeoutInSeconds, TimeUnit.SECONDS);
}
catch (InterruptedException | ExecutionException | TimeoutException e) {
log.warn(e, "Exception while stopping task");
@@ -1338,6 +1376,8 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
taskGroupId,
k -> {
log.info("Creating a new task group for taskGroupId[%d]", taskGroupId);
+ // We reassign the task's original base sequence name (from the existing task) to the
+ // task group so that the replica segment allocations are the same.
return new TaskGroup(
taskGroupId,
ImmutableMap.copyOf(
@@ -1349,7 +1389,8 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
seekableStreamIndexTask.getIOConfig().getMaximumMessageTime(),
seekableStreamIndexTask.getIOConfig()
.getStartSequenceNumbers()
- .getExclusivePartitions()
+ .getExclusivePartitions(),
+ seekableStreamIndexTask.getIOConfig().getBaseSequenceName()
);
}
);
@@ -1362,6 +1403,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
taskId
);
}
+ verifySameSequenceNameForAllTasksInGroup(taskGroupId);
}
}
return true;
@@ -1371,7 +1413,6 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
return null;
}
}
-
}, workerExec
)
);
@@ -1379,7 +1420,6 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
}
-
List<Boolean> results = Futures.successfulAsList(futures).get(futureTimeoutInSeconds, TimeUnit.SECONDS);
for (int i = 0; i < results.size(); i++) {
if (results.get(i) == null) {
@@ -1424,10 +1464,11 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
final List<String> taskIds = new ArrayList<>();
for (String taskId : taskGroup.taskIds()) {
- final ListenableFuture<TreeMap<Integer, Map<PartitionIdType, SequenceOffsetType>>> checkpointsFuture = taskClient.getCheckpointsAsync(
- taskId,
- true
- );
+ final ListenableFuture<TreeMap<Integer, Map<PartitionIdType, SequenceOffsetType>>> checkpointsFuture =
+ taskClient.getCheckpointsAsync(
+ taskId,
+ true
+ );
futures.add(checkpointsFuture);
taskIds.add(taskId);
}
@@ -1612,6 +1653,34 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
taskGroupList.add(newTaskGroup);
}
+ // Sanity check to ensure that tasks have the same sequence name as their task group
+ private void verifySameSequenceNameForAllTasksInGroup(int groupId)
+ {
+ String taskGroupSequenceName = activelyReadingTaskGroups.get(groupId).baseSequenceName;
+ boolean allSequenceNamesMatch =
+ activelyReadingTaskGroups.get(groupId)
+ .tasks
+ .keySet()
+ .stream()
+ .map(x -> {
+ Optional<Task> taskOptional = taskStorage.getTask(x);
+ if (!taskOptional.isPresent() || !doesTaskTypeMatchSupervisor(taskOptional.get())) {
+ return false;
+ }
+ @SuppressWarnings("unchecked")
+ SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType> task =
+ (SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType>) taskOptional.get();
+ return task.getIOConfig().getBaseSequenceName();
+ })
+ .allMatch(taskSeqName -> taskSeqName.equals(taskGroupSequenceName));
+ if (!allSequenceNamesMatch) {
+ throw new ISE(
+ "Base sequence names do not match for the tasks in the task group with ID [%s]",
+ groupId
+ );
+ }
+ }
+
private ListenableFuture<Void> stopTask(final String id, final boolean publish)
{
return Futures.transform(
@@ -1631,7 +1700,8 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
);
}
- private boolean isTaskCurrent(int taskGroupId, String taskId)
+ @VisibleForTesting
+ public boolean isTaskCurrent(int taskGroupId, String taskId)
{
Optional<Task> taskOptional = taskStorage.getTask(taskId);
if (!taskOptional.isPresent() || !doesTaskTypeMatchSupervisor(taskOptional.get())) {
@@ -1639,22 +1709,39 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
@SuppressWarnings("unchecked")
- SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType> task = (SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType>) taskOptional
- .get();
+ SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType> task =
+ (SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType>) taskOptional.get();
+
+ // We recompute the sequence name hash for the supervisor's own configuration and compare this to the hash created
+ // by rehashing the task's sequence name using the most up-to-date class definitions of tuning config and
+ // data schema. Recomputing both ensures that forwards-compatible tasks won't be killed (which would occur
+ // if the hash generated using the old class definitions was used).
+ String taskSequenceName = generateSequenceName(
+ task.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap(),
+ task.getIOConfig().getMinimumMessageTime(),
+ task.getIOConfig().getMaximumMessageTime(),
+ task.getDataSchema(),
+ task.getTuningConfig()
+ );
- String taskSequenceName = task.getIOConfig().getBaseSequenceName();
if (activelyReadingTaskGroups.get(taskGroupId) != null) {
- return Preconditions
- .checkNotNull(activelyReadingTaskGroups.get(taskGroupId), "null taskGroup for taskId[%s]", taskGroupId)
- .baseSequenceName
- .equals(taskSequenceName);
+ TaskGroup taskGroup = activelyReadingTaskGroups.get(taskGroupId);
+ return generateSequenceName(
+ taskGroup.startingSequences,
+ taskGroup.minimumMessageTime,
+ taskGroup.maximumMessageTime,
+ spec.getDataSchema(),
+ taskTuningConfig
+ ).equals(taskSequenceName);
} else {
return generateSequenceName(
task.getIOConfig()
.getStartSequenceNumbers()
.getPartitionSequenceNumberMap(),
task.getIOConfig().getMinimumMessageTime(),
- task.getIOConfig().getMaximumMessageTime()
+ task.getIOConfig().getMaximumMessageTime(),
+ spec.getDataSchema(),
+ taskTuningConfig
).equals(taskSequenceName);
}
}
@@ -1663,7 +1750,9 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
protected String generateSequenceName(
Map<PartitionIdType, SequenceOffsetType> startPartitions,
Optional<DateTime> minimumMessageTime,
- Optional<DateTime> maximumMessageTime
+ Optional<DateTime> maximumMessageTime,
+ DataSchema dataSchema,
+ SeekableStreamIndexTaskTuningConfig tuningConfig
)
{
StringBuilder sb = new StringBuilder();
@@ -1676,17 +1765,17 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
String minMsgTimeStr = (minimumMessageTime.isPresent() ? String.valueOf(minimumMessageTime.get().getMillis()) : "");
String maxMsgTimeStr = (maximumMessageTime.isPresent() ? String.valueOf(maximumMessageTime.get().getMillis()) : "");
- String dataSchema, tuningConfig;
+ String dataSchemaStr, tuningConfigStr;
try {
- dataSchema = sortingMapper.writeValueAsString(spec.getDataSchema());
- tuningConfig = sortingMapper.writeValueAsString(taskTuningConfig);
+ dataSchemaStr = sortingMapper.writeValueAsString(dataSchema);
+ tuningConfigStr = sortingMapper.writeValueAsString(tuningConfig);
}
catch (JsonProcessingException e) {
throw Throwables.propagate(e);
}
- String hashCode = DigestUtils.sha1Hex(dataSchema
- + tuningConfig
+ String hashCode = DigestUtils.sha1Hex(dataSchemaStr
+ + tuningConfigStr
+ partitionOffsetStr
+ minMsgTimeStr
+ maxMsgTimeStr)
@@ -2283,7 +2372,6 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
exclusiveStartSequenceNumberPartitions
)
);
-
}
}
@@ -2468,8 +2556,8 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
.get(groupId)
.exclusiveStartSequenceNumberPartitions;
- DateTime minimumMessageTime = activelyReadingTaskGroups.get(groupId).minimumMessageTime.orNull();
- DateTime maximumMessageTime = activelyReadingTaskGroups.get(groupId).maximumMessageTime.orNull();
+ DateTime minimumMessageTime = group.minimumMessageTime.orNull();
+ DateTime maximumMessageTime = group.maximumMessageTime.orNull();
SeekableStreamIndexTaskIOConfig newIoConfig = createTaskIoConfig(
groupId,
@@ -2482,7 +2570,6 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
ioConfig
);
-
List<SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType>> taskList = createIndexTasks(
replicas,
group.baseSequenceName,
@@ -2705,7 +2792,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
* calculates the taskgroup id that the given partition belongs to.
* different between Kafka/Kinesis since Kinesis uses String as partition id
*
- * @param partition paritition id
+ * @param partition partition id
*
* @return taskgroup id
*/
diff --git a/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java b/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java
index 6c5c5d4..58a2c4b 100644
--- a/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java
+++ b/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java
@@ -48,6 +48,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
@@ -375,4 +376,82 @@ public class DataSchemaTest
)
);
}
+
+ @Test
+ public void testSerdeWithUpdatedDataSchemaAddedField() throws IOException
+ {
+ Map<String, Object> parser = jsonMapper.convertValue(
+ new StringInputRowParser(
+ new JSONParseSpec(
+ new TimestampSpec("time", "auto", null),
+ new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dimB", "dimA")), null, null),
+ null,
+ null
+ ),
+ null
+ ), JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
+ );
+
+ DataSchema originalSchema = new DataSchema(
+ "test",
+ parser,
+ new AggregatorFactory[]{
+ new DoubleSumAggregatorFactory("metric1", "col1"),
+ new DoubleSumAggregatorFactory("metric2", "col2"),
+ },
+ new ArbitraryGranularitySpec(Granularities.DAY, ImmutableList.of(Intervals.of("2014/2015"))),
+ null,
+ jsonMapper
+ );
+
+ String serialized = jsonMapper.writeValueAsString(originalSchema);
+ TestModifiedDataSchema deserialized = jsonMapper.readValue(serialized, TestModifiedDataSchema.class);
+
+ Assert.assertEquals(null, deserialized.getExtra());
+ Assert.assertEquals(originalSchema.getDataSource(), deserialized.getDataSource());
+ Assert.assertEquals(originalSchema.getGranularitySpec(), deserialized.getGranularitySpec());
+ Assert.assertEquals(originalSchema.getParser().getParseSpec(), deserialized.getParser().getParseSpec());
+ Assert.assertArrayEquals(originalSchema.getAggregators(), deserialized.getAggregators());
+ Assert.assertEquals(originalSchema.getTransformSpec(), deserialized.getTransformSpec());
+ Assert.assertEquals(originalSchema.getParserMap(), deserialized.getParserMap());
+ }
+
+ @Test
+ public void testSerdeWithUpdatedDataSchemaRemovedField() throws IOException
+ {
+ Map<String, Object> parser = jsonMapper.convertValue(
+ new StringInputRowParser(
+ new JSONParseSpec(
+ new TimestampSpec("time", "auto", null),
+ new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dimB", "dimA")), null, null),
+ null,
+ null
+ ),
+ null
+ ), JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
+ );
+
+ TestModifiedDataSchema originalSchema = new TestModifiedDataSchema(
+ "test",
+ parser,
+ new AggregatorFactory[]{
+ new DoubleSumAggregatorFactory("metric1", "col1"),
+ new DoubleSumAggregatorFactory("metric2", "col2"),
+ },
+ new ArbitraryGranularitySpec(Granularities.DAY, ImmutableList.of(Intervals.of("2014/2015"))),
+ null,
+ jsonMapper,
+ "some arbitrary string"
+ );
+
+ String serialized = jsonMapper.writeValueAsString(originalSchema);
+ DataSchema deserialized = jsonMapper.readValue(serialized, DataSchema.class);
+
+ Assert.assertEquals(originalSchema.getDataSource(), deserialized.getDataSource());
+ Assert.assertEquals(originalSchema.getGranularitySpec(), deserialized.getGranularitySpec());
+ Assert.assertEquals(originalSchema.getParser().getParseSpec(), deserialized.getParser().getParseSpec());
+ Assert.assertArrayEquals(originalSchema.getAggregators(), deserialized.getAggregators());
+ Assert.assertEquals(originalSchema.getTransformSpec(), deserialized.getTransformSpec());
+ Assert.assertEquals(originalSchema.getParserMap(), deserialized.getParserMap());
+ }
}
diff --git a/server/src/test/java/org/apache/druid/segment/indexing/TestModifiedDataSchema.java b/server/src/test/java/org/apache/druid/segment/indexing/TestModifiedDataSchema.java
new file mode 100644
index 0000000..ca030fe
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/segment/indexing/TestModifiedDataSchema.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.indexing;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.segment.indexing.granularity.GranularitySpec;
+import org.apache.druid.segment.transform.TransformSpec;
+
+import java.util.Map;
+
+public class TestModifiedDataSchema extends DataSchema
+{
+ private final String extra;
+
+ @JsonCreator
+ public TestModifiedDataSchema(
+ @JsonProperty("dataSource") String dataSource,
+ @JsonProperty("parser") Map<String, Object> parser,
+ @JsonProperty("metricsSpec") AggregatorFactory[] aggregators,
+ @JsonProperty("granularitySpec") GranularitySpec granularitySpec,
+ @JsonProperty("transformSpec") TransformSpec transformSpec,
+ @JacksonInject ObjectMapper jsonMapper,
+ @JsonProperty("extra") String extra
+ )
+ {
+ super(dataSource, parser, aggregators, granularitySpec, transformSpec, jsonMapper);
+ this.extra = extra;
+ }
+
+ @JsonProperty("extra")
+ public String getExtra()
+ {
+ return extra;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org