You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ji...@apache.org on 2020/10/24 01:34:52 UTC
[druid] branch master updated: Configurable Index Type (#10335)
This is an automated email from the ASF dual-hosted git repository.
jihoonson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new f3a2903 Configurable Index Type (#10335)
f3a2903 is described below
commit f3a2903218573f5d336b082b1c9b8a60a19e8c54
Author: Liran Funaro <li...@gmail.com>
AuthorDate: Sat Oct 24 04:34:26 2020 +0300
Configurable Index Type (#10335)
* Introduce a Configurable Index Type
* Change to @UnstableApi
* Add AppendableIndexSpecTest
* Update doc
* Add spelling exception
* Add tests coverage
* Revert some of the changes to reduce diff
* Minor fixes
* Update getMaxBytesInMemoryOrDefault() comment
* Fix typo, remove redundant interface
* Remove off-heap spec (postponed to a later PR)
* Add javadocs to AppendableIndexSpec
* Describe testCreateTask()
* Add tests for AppendableIndexSpec within TuningConfig
* Modify hashCode() to conform with equals()
* Add comment where building incremental-index
* Add "EqualsVerifier" tests
* Revert some of the API back to AppenderatorConfig
* Don't use multi-line comments
* Remove knob documentation (deferred)
---
.../MaterializedViewSupervisorSpec.java | 1 +
.../MaterializedViewSupervisorTest.java | 32 +++++-
extensions-core/kafka-indexing-service/pom.xml | 5 +
.../indexing/kafka/KafkaIndexTaskTuningConfig.java | 4 +
.../supervisor/KafkaSupervisorTuningConfig.java | 8 +-
.../druid/indexing/kafka/KafkaIndexTaskTest.java | 1 +
.../kafka/KafkaIndexTaskTuningConfigTest.java | 21 +++-
.../kafka/supervisor/KafkaSupervisorTest.java | 5 +
.../KafkaSupervisorTuningConfigTest.java | 6 +-
.../TestModifiedKafkaIndexTaskTuningConfig.java | 3 +
extensions-core/kinesis-indexing-service/pom.xml | 5 +
.../kinesis/KinesisIndexTaskTuningConfig.java | 4 +
.../supervisor/KinesisSupervisorTuningConfig.java | 5 +
.../kinesis/KinesisIndexTaskSerdeTest.java | 1 +
.../indexing/kinesis/KinesisIndexTaskTest.java | 1 +
.../kinesis/KinesisIndexTaskTuningConfigTest.java | 21 +++-
.../kinesis/supervisor/KinesisSupervisorTest.java | 3 +
.../KinesisSupervisorTuningConfigTest.java | 6 +-
.../TestModifiedKinesisIndexTaskTuningConfig.java | 4 +
.../apache/druid/indexer/HadoopTuningConfig.java | 18 ++-
.../apache/druid/indexer/IndexGeneratorJob.java | 8 +-
.../druid/indexer/BatchDeltaIngestionTest.java | 1 +
.../indexer/DetermineHashedPartitionsJobTest.java | 1 +
.../druid/indexer/DeterminePartitionsJobTest.java | 1 +
.../indexer/HadoopDruidIndexerConfigTest.java | 1 +
.../druid/indexer/HadoopTuningConfigTest.java | 3 +
.../druid/indexer/IndexGeneratorJobTest.java | 1 +
.../org/apache/druid/indexer/JobHelperTest.java | 1 +
.../indexer/path/GranularityPathSpecTest.java | 1 +
.../index/RealtimeAppenderatorTuningConfig.java | 19 ++-
.../druid/indexing/common/task/CompactionTask.java | 1 +
.../druid/indexing/common/task/IndexTask.java | 25 +++-
.../parallel/ParallelIndexSupervisorTask.java | 1 +
.../batch/parallel/ParallelIndexTuningConfig.java | 5 +
.../SeekableStreamIndexTaskTuningConfig.java | 19 ++-
.../AppenderatorDriverRealtimeIndexTaskTest.java | 1 +
.../task/ClientCompactionTaskQuerySerdeTest.java | 1 +
.../common/task/CompactionTaskRunTest.java | 1 +
.../indexing/common/task/CompactionTaskTest.java | 6 +
.../indexing/common/task/IndexTaskSerdeTest.java | 6 +
.../druid/indexing/common/task/IndexTaskTest.java | 13 +++
.../common/task/RealtimeIndexTaskTest.java | 1 +
.../druid/indexing/common/task/TaskSerdeTest.java | 3 +
.../AbstractParallelIndexSupervisorTaskTest.java | 2 +
.../ParallelIndexSupervisorTaskKillTest.java | 1 +
.../ParallelIndexSupervisorTaskResourceTest.java | 1 +
.../ParallelIndexSupervisorTaskSerdeTest.java | 1 +
.../parallel/ParallelIndexSupervisorTaskTest.java | 1 +
.../parallel/ParallelIndexTestingFactory.java | 1 +
.../parallel/ParallelIndexTuningConfigTest.java | 16 +++
.../parallel/SinglePhaseParallelIndexingTest.java | 1 +
.../druid/indexing/overlord/TaskLifecycleTest.java | 5 +
.../SeekableStreamSupervisorStateTest.java | 1 +
.../druid/jackson/AppendableIndexModule.java | 36 +++---
.../apache/druid/jackson/DefaultObjectMapper.java | 1 +
.../druid/query/groupby/GroupByQueryHelper.java | 31 ++---
.../incremental/AppendableIndexBuilder.java | 128 +++++++++++++++++++++
.../segment/incremental/AppendableIndexSpec.java | 30 ++---
.../segment/incremental/IncrementalIndex.java | 109 ++++--------------
.../incremental/OffheapIncrementalIndex.java | 47 ++++++--
.../incremental/OnheapIncrementalIndex.java | 49 ++++++++
.../druid/segment/data/IncrementalIndexTest.java | 33 +++---
.../segment/incremental/IncrementalIndexTest.java | 9 +-
.../segment/indexing/RealtimeTuningConfig.java | 19 ++-
.../druid/segment/indexing/TuningConfig.java | 37 +++++-
.../realtime/appenderator/AppenderatorConfig.java | 8 +-
.../realtime/appenderator/AppenderatorImpl.java | 5 +-
.../UnifiedIndexerAppenderatorsManager.java | 7 ++
.../segment/realtime/plumber/RealtimePlumber.java | 7 +-
.../druid/segment/realtime/plumber/Sink.java | 12 +-
.../segment/indexing/RealtimeTuningConfigTest.java | 6 +-
.../appenderator/AppenderatorPlumberTest.java | 1 +
.../realtime/appenderator/AppenderatorTester.java | 1 +
.../DefaultOfflineAppenderatorFactoryTest.java | 1 +
.../plumber/RealtimePlumberSchoolTest.java | 11 +-
.../druid/segment/realtime/plumber/SinkTest.java | 9 +-
.../druid/cli/validate/DruidJsonValidatorTest.java | 1 +
77 files changed, 687 insertions(+), 215 deletions(-)
diff --git a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java
index dd38513..5388388 100644
--- a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java
+++ b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java
@@ -181,6 +181,7 @@ public class MaterializedViewSupervisorSpec implements SupervisorSpec
tuningConfig.getShardSpecs(),
tuningConfig.getIndexSpec(),
tuningConfig.getIndexSpecForIntermediatePersists(),
+ tuningConfig.getAppendableIndexSpec(),
tuningConfig.getRowFlushBoundary(),
tuningConfig.getMaxBytesInMemory(),
tuningConfig.isLeaveIntermediate(),
diff --git a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java
index 9694c2f..8dc6e53 100644
--- a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java
+++ b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java
@@ -83,6 +83,7 @@ public class MaterializedViewSupervisorTest
private TaskQueue taskQueue;
private MaterializedViewSupervisor supervisor;
private String derivativeDatasourceName;
+ private MaterializedViewSupervisorSpec spec;
private final ObjectMapper objectMapper = TestHelper.makeJsonMapper();
@Before
@@ -103,7 +104,7 @@ public class MaterializedViewSupervisorTest
taskQueue = EasyMock.createMock(TaskQueue.class);
taskQueue.start();
objectMapper.registerSubtypes(new NamedType(HashBasedNumberedShardSpec.class, "hashed"));
- MaterializedViewSupervisorSpec spec = new MaterializedViewSupervisorSpec(
+ spec = new MaterializedViewSupervisorSpec(
"base",
new DimensionsSpec(Collections.singletonList(new StringDimensionSchema("dim")), null, null),
new AggregatorFactory[]{new LongSumAggregatorFactory("m1", "m1")},
@@ -317,6 +318,35 @@ public class MaterializedViewSupervisorTest
}
+ /**
+ * Verifies that creating HadoopIndexTask compleates without raising exception.
+ */
+ @Test
+ public void testCreateTask()
+ {
+ List<DataSegment> baseSegments = Collections.singletonList(
+ new DataSegment(
+ "base",
+ Intervals.of("2015-01-02T00Z/2015-01-03T00Z"),
+ "2015-01-03",
+ ImmutableMap.of(),
+ ImmutableList.of("dim1", "dim2"),
+ ImmutableList.of("m1"),
+ new HashBasedNumberedShardSpec(0, 1, 0, 1, null, null, null),
+ 9,
+ 1024
+ )
+ );
+
+ HadoopIndexTask task = spec.createTask(
+ Intervals.of("2015-01-02T00Z/2015-01-03T00Z"),
+ "2015-01-03",
+ baseSegments
+ );
+
+ Assert.assertNotNull(task);
+ }
+
@Test
public void testSuspendedDoesntRun()
{
diff --git a/extensions-core/kafka-indexing-service/pom.xml b/extensions-core/kafka-indexing-service/pom.xml
index d192b68..7cc1ebf 100644
--- a/extensions-core/kafka-indexing-service/pom.xml
+++ b/extensions-core/kafka-indexing-service/pom.xml
@@ -203,6 +203,11 @@
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>nl.jqno.equalsverifier</groupId>
+ <artifactId>equalsverifier</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java
index f23ee7b..8dc9a2f 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.joda.time.Period;
@@ -33,6 +34,7 @@ public class KafkaIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningCon
{
@JsonCreator
public KafkaIndexTaskTuningConfig(
+ @JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec,
@JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory,
@JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory,
@JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment,
@@ -55,6 +57,7 @@ public class KafkaIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningCon
)
{
super(
+ appendableIndexSpec,
maxRowsInMemory,
maxBytesInMemory,
maxRowsPerSegment,
@@ -81,6 +84,7 @@ public class KafkaIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningCon
public KafkaIndexTaskTuningConfig withBasePersistDirectory(File dir)
{
return new KafkaIndexTaskTuningConfig(
+ getAppendableIndexSpec(),
getMaxRowsInMemory(),
getMaxBytesInMemory(),
getMaxRowsPerSegment(),
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java
index 2dca7f6..a9e0bfe 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java
@@ -23,7 +23,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.indexing.kafka.KafkaIndexTaskTuningConfig;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorTuningConfig;
import org.apache.druid.segment.IndexSpec;
-import org.apache.druid.segment.indexing.TuningConfigs;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.joda.time.Duration;
import org.joda.time.Period;
@@ -67,11 +67,13 @@ public class KafkaSupervisorTuningConfig extends KafkaIndexTaskTuningConfig
null,
null,
null,
+ null,
null
);
}
public KafkaSupervisorTuningConfig(
+ @JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec,
@JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
@JsonProperty("maxBytesInMemory") Long maxBytesInMemory,
@JsonProperty("maxRowsPerSegment") Integer maxRowsPerSegment,
@@ -100,6 +102,7 @@ public class KafkaSupervisorTuningConfig extends KafkaIndexTaskTuningConfig
)
{
super(
+ appendableIndexSpec,
maxRowsInMemory,
maxBytesInMemory,
maxRowsPerSegment,
@@ -193,7 +196,7 @@ public class KafkaSupervisorTuningConfig extends KafkaIndexTaskTuningConfig
"maxRowsInMemory=" + getMaxRowsInMemory() +
", maxRowsPerSegment=" + getMaxRowsPerSegment() +
", maxTotalRows=" + getMaxTotalRows() +
- ", maxBytesInMemory=" + TuningConfigs.getMaxBytesInMemoryOrDefault(getMaxBytesInMemory()) +
+ ", maxBytesInMemory=" + getMaxBytesInMemoryOrDefault() +
", intermediatePersistPeriod=" + getIntermediatePersistPeriod() +
", basePersistDirectory=" + getBasePersistDirectory() +
", maxPendingPersists=" + getMaxPendingPersists() +
@@ -219,6 +222,7 @@ public class KafkaSupervisorTuningConfig extends KafkaIndexTaskTuningConfig
public KafkaIndexTaskTuningConfig convertToTaskTuningConfig()
{
return new KafkaIndexTaskTuningConfig(
+ getAppendableIndexSpec(),
getMaxRowsInMemory(),
getMaxBytesInMemory(),
getMaxRowsPerSegment(),
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index f4098fc..9b53735 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -2497,6 +2497,7 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
) throws JsonProcessingException
{
final KafkaIndexTaskTuningConfig tuningConfig = new KafkaIndexTaskTuningConfig(
+ null,
1000,
null,
maxRowsPerSegment,
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java
index 3a930db..8888a69 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java
@@ -21,11 +21,13 @@ package org.apache.druid.indexing.kafka;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
+import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorTuningConfig;
import org.apache.druid.indexing.kafka.test.TestModifiedKafkaIndexTaskTuningConfig;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.data.CompressionStrategy;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.segment.indexing.TuningConfig;
import org.joda.time.Period;
import org.junit.Assert;
@@ -60,6 +62,7 @@ public class KafkaIndexTaskTuningConfigTest
);
Assert.assertNotNull(config.getBasePersistDirectory());
+ Assert.assertEquals(new OnheapIncrementalIndex.Spec(), config.getAppendableIndexSpec());
Assert.assertEquals(1000000, config.getMaxRowsInMemory());
Assert.assertEquals(5_000_000, config.getMaxRowsPerSegment().intValue());
Assert.assertNull(config.getMaxTotalRows());
@@ -85,7 +88,8 @@ public class KafkaIndexTaskTuningConfigTest
+ " \"reportParseExceptions\": true,\n"
+ " \"handoffConditionTimeout\": 100,\n"
+ " \"indexSpec\": { \"metricCompression\" : \"NONE\" },\n"
- + " \"indexSpecForIntermediatePersists\": { \"dimensionCompression\" : \"uncompressed\" }\n"
+ + " \"indexSpecForIntermediatePersists\": { \"dimensionCompression\" : \"uncompressed\" },\n"
+ + " \"appendableIndexSpec\": { \"type\" : \"onheap\" }\n"
+ "}";
KafkaIndexTaskTuningConfig config = (KafkaIndexTaskTuningConfig) mapper.readValue(
@@ -99,6 +103,7 @@ public class KafkaIndexTaskTuningConfigTest
);
Assert.assertEquals(new File("/tmp/xxx"), config.getBasePersistDirectory());
+ Assert.assertEquals(new OnheapIncrementalIndex.Spec(), config.getAppendableIndexSpec());
Assert.assertEquals(100, config.getMaxRowsInMemory());
Assert.assertEquals(100, config.getMaxRowsPerSegment().intValue());
Assert.assertNotEquals(null, config.getMaxTotalRows());
@@ -115,6 +120,7 @@ public class KafkaIndexTaskTuningConfigTest
public void testConvert()
{
KafkaSupervisorTuningConfig original = new KafkaSupervisorTuningConfig(
+ null,
1,
null,
2,
@@ -142,6 +148,7 @@ public class KafkaIndexTaskTuningConfigTest
);
KafkaIndexTaskTuningConfig copy = (KafkaIndexTaskTuningConfig) original.convertToTaskTuningConfig();
+ Assert.assertEquals(original.getAppendableIndexSpec(), copy.getAppendableIndexSpec());
Assert.assertEquals(1, copy.getMaxRowsInMemory());
Assert.assertEquals(2, copy.getMaxRowsPerSegment().intValue());
Assert.assertNotEquals(null, copy.getMaxTotalRows());
@@ -158,6 +165,7 @@ public class KafkaIndexTaskTuningConfigTest
public void testSerdeWithModifiedTuningConfigAddedField() throws IOException
{
KafkaIndexTaskTuningConfig base = new KafkaIndexTaskTuningConfig(
+ null,
1,
null,
2,
@@ -183,6 +191,7 @@ public class KafkaIndexTaskTuningConfigTest
mapper.readValue(serialized, TestModifiedKafkaIndexTaskTuningConfig.class);
Assert.assertEquals(null, deserialized.getExtra());
+ Assert.assertEquals(base.getAppendableIndexSpec(), deserialized.getAppendableIndexSpec());
Assert.assertEquals(base.getMaxRowsInMemory(), deserialized.getMaxRowsInMemory());
Assert.assertEquals(base.getMaxBytesInMemory(), deserialized.getMaxBytesInMemory());
Assert.assertEquals(base.getMaxRowsPerSegment(), deserialized.getMaxRowsPerSegment());
@@ -206,6 +215,7 @@ public class KafkaIndexTaskTuningConfigTest
public void testSerdeWithModifiedTuningConfigRemovedField() throws IOException
{
TestModifiedKafkaIndexTaskTuningConfig base = new TestModifiedKafkaIndexTaskTuningConfig(
+ null,
1,
null,
2,
@@ -231,6 +241,7 @@ public class KafkaIndexTaskTuningConfigTest
KafkaIndexTaskTuningConfig deserialized =
mapper.readValue(serialized, KafkaIndexTaskTuningConfig.class);
+ Assert.assertEquals(base.getAppendableIndexSpec(), deserialized.getAppendableIndexSpec());
Assert.assertEquals(base.getMaxRowsInMemory(), deserialized.getMaxRowsInMemory());
Assert.assertEquals(base.getMaxBytesInMemory(), deserialized.getMaxBytesInMemory());
Assert.assertEquals(base.getMaxRowsPerSegment(), deserialized.getMaxRowsPerSegment());
@@ -249,4 +260,12 @@ public class KafkaIndexTaskTuningConfigTest
Assert.assertEquals(base.getMaxParseExceptions(), deserialized.getMaxParseExceptions());
Assert.assertEquals(base.getMaxSavedParseExceptions(), deserialized.getMaxSavedParseExceptions());
}
+
+ @Test
+ public void testEqualsAndHashCode()
+ {
+ EqualsVerifier.forClass(KafkaIndexTaskTuningConfig.class)
+ .usingGetClass()
+ .verify();
+ }
}
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index 905abd1..251d837 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -270,6 +270,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
null,
null,
null,
+ null,
null
),
null
@@ -3070,6 +3071,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
kafkaHost,
dataSchema,
new KafkaSupervisorTuningConfig(
+ null,
1000,
null,
50000,
@@ -3109,6 +3111,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
DataSchema modifiedDataSchema = getDataSchema("some other datasource");
KafkaSupervisorTuningConfig modifiedTuningConfig = new KafkaSupervisorTuningConfig(
+ null,
42, // This is different
null,
50000,
@@ -3404,6 +3407,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
};
final KafkaSupervisorTuningConfig tuningConfig = new KafkaSupervisorTuningConfig(
+ null,
1000,
null,
50000,
@@ -3514,6 +3518,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
};
final KafkaSupervisorTuningConfig tuningConfig = new KafkaSupervisorTuningConfig(
+ null,
1000,
null,
50000,
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java
index 5859f90..7151eb0 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java
@@ -25,6 +25,7 @@ import org.apache.druid.indexing.kafka.KafkaIndexTaskModule;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.data.CompressionStrategy;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.segment.indexing.TuningConfig;
import org.joda.time.Duration;
import org.joda.time.Period;
@@ -59,6 +60,7 @@ public class KafkaSupervisorTuningConfigTest
);
Assert.assertNotNull(config.getBasePersistDirectory());
+ Assert.assertEquals(new OnheapIncrementalIndex.Spec(), config.getAppendableIndexSpec());
Assert.assertEquals(1000000, config.getMaxRowsInMemory());
Assert.assertEquals(5_000_000, config.getMaxRowsPerSegment().intValue());
Assert.assertEquals(new Period("PT10M"), config.getIntermediatePersistPeriod());
@@ -94,7 +96,8 @@ public class KafkaSupervisorTuningConfigTest
+ " \"shutdownTimeout\": \"PT95S\",\n"
+ " \"offsetFetchPeriod\": \"PT20S\",\n"
+ " \"indexSpec\": { \"metricCompression\" : \"NONE\" },\n"
- + " \"indexSpecForIntermediatePersists\": { \"dimensionCompression\" : \"uncompressed\" }\n"
+ + " \"indexSpecForIntermediatePersists\": { \"dimensionCompression\" : \"uncompressed\" },\n"
+ + " \"appendableIndexSpec\": { \"type\" : \"onheap\" }\n"
+ "}";
KafkaSupervisorTuningConfig config = (KafkaSupervisorTuningConfig) mapper.readValue(
@@ -108,6 +111,7 @@ public class KafkaSupervisorTuningConfigTest
);
Assert.assertEquals(new File("/tmp/xxx"), config.getBasePersistDirectory());
+ Assert.assertEquals(new OnheapIncrementalIndex.Spec(), config.getAppendableIndexSpec());
Assert.assertEquals(100, config.getMaxRowsInMemory());
Assert.assertEquals(100, config.getMaxRowsPerSegment().intValue());
Assert.assertEquals(new Period("PT1H"), config.getIntermediatePersistPeriod());
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java
index 27e69e8..06550e1 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.druid.indexing.kafka.KafkaIndexTaskTuningConfig;
import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.joda.time.Period;
@@ -37,6 +38,7 @@ public class TestModifiedKafkaIndexTaskTuningConfig extends KafkaIndexTaskTuning
@JsonCreator
public TestModifiedKafkaIndexTaskTuningConfig(
+ @JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec,
@JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory,
@JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory,
@JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment,
@@ -60,6 +62,7 @@ public class TestModifiedKafkaIndexTaskTuningConfig extends KafkaIndexTaskTuning
)
{
super(
+ appendableIndexSpec,
maxRowsInMemory,
maxBytesInMemory,
maxRowsPerSegment,
diff --git a/extensions-core/kinesis-indexing-service/pom.xml b/extensions-core/kinesis-indexing-service/pom.xml
index 6b08d50..0659bed 100644
--- a/extensions-core/kinesis-indexing-service/pom.xml
+++ b/extensions-core/kinesis-indexing-service/pom.xml
@@ -192,6 +192,11 @@
<artifactId>system-rules</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>nl.jqno.equalsverifier</groupId>
+ <artifactId>equalsverifier</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java
index f033a6d..428f54f 100644
--- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java
+++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java
@@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.joda.time.Period;
@@ -50,6 +51,7 @@ public class KinesisIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningC
@JsonCreator
public KinesisIndexTaskTuningConfig(
+ @JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec,
@JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
@JsonProperty("maxBytesInMemory") Long maxBytesInMemory,
@JsonProperty("maxRowsPerSegment") Integer maxRowsPerSegment,
@@ -78,6 +80,7 @@ public class KinesisIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningC
)
{
super(
+ appendableIndexSpec,
maxRowsInMemory,
maxBytesInMemory,
maxRowsPerSegment,
@@ -154,6 +157,7 @@ public class KinesisIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningC
public KinesisIndexTaskTuningConfig withBasePersistDirectory(File dir)
{
return new KinesisIndexTaskTuningConfig(
+ getAppendableIndexSpec(),
getMaxRowsInMemory(),
getMaxBytesInMemory(),
getMaxRowsPerSegment(),
diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java
index 8c1a5fa..7cf49a3 100644
--- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java
+++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.indexing.kinesis.KinesisIndexTaskTuningConfig;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorTuningConfig;
import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.joda.time.Duration;
import org.joda.time.Period;
@@ -75,11 +76,13 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
null,
null,
null,
+ null,
null
);
}
public KinesisSupervisorTuningConfig(
+ @JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec,
@JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
@JsonProperty("maxBytesInMemory") Long maxBytesInMemory,
@JsonProperty("maxRowsPerSegment") Integer maxRowsPerSegment,
@@ -115,6 +118,7 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
)
{
super(
+ appendableIndexSpec,
maxRowsInMemory,
maxBytesInMemory,
maxRowsPerSegment,
@@ -248,6 +252,7 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
public KinesisIndexTaskTuningConfig convertToTaskTuningConfig()
{
return new KinesisIndexTaskTuningConfig(
+ getAppendableIndexSpec(),
getMaxRowsInMemory(),
getMaxBytesInMemory(),
getMaxRowsPerSegment(),
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java
index 9ab058c..f6b3582 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java
@@ -72,6 +72,7 @@ public class KinesisIndexTaskSerdeTest
null,
null,
null,
+ null,
null
);
private static final KinesisIndexTaskIOConfig IO_CONFIG = new KinesisIndexTaskIOConfig(
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
index 4aff50b..8ff5b43 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
@@ -2739,6 +2739,7 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase
boolean resetOffsetAutomatically = false;
int maxRowsInMemory = 1000;
final KinesisIndexTaskTuningConfig tuningConfig = new KinesisIndexTaskTuningConfig(
+ null,
maxRowsInMemory,
null,
maxRowsPerSegment,
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java
index 15e56c0..69a70a4 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java
@@ -22,10 +22,12 @@ package org.apache.druid.indexing.kinesis;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
+import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisorTuningConfig;
import org.apache.druid.indexing.kinesis.test.TestModifiedKinesisIndexTaskTuningConfig;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.segment.indexing.TuningConfig;
import org.hamcrest.CoreMatchers;
import org.joda.time.Period;
@@ -66,6 +68,7 @@ public class KinesisIndexTaskTuningConfigTest
);
Assert.assertNotNull(config.getBasePersistDirectory());
+ Assert.assertEquals(new OnheapIncrementalIndex.Spec(), config.getAppendableIndexSpec());
Assert.assertEquals(1000000, config.getMaxRowsInMemory());
Assert.assertEquals(5_000_000, config.getMaxRowsPerSegment().intValue());
Assert.assertEquals(new Period("PT10M"), config.getIntermediatePersistPeriod());
@@ -102,7 +105,8 @@ public class KinesisIndexTaskTuningConfigTest
+ " \"fetchSequenceNumberTimeout\": 6000,\n"
+ " \"resetOffsetAutomatically\": false,\n"
+ " \"skipSequenceNumberAvailabilityCheck\": true,\n"
- + " \"fetchThreads\": 2\n"
+ + " \"fetchThreads\": 2,\n"
+ + " \"appendableIndexSpec\": { \"type\" : \"onheap\" }\n"
+ "}";
KinesisIndexTaskTuningConfig config = (KinesisIndexTaskTuningConfig) mapper.readValue(
@@ -116,6 +120,7 @@ public class KinesisIndexTaskTuningConfigTest
);
Assert.assertEquals(new File("/tmp/xxx"), config.getBasePersistDirectory());
+ Assert.assertEquals(new OnheapIncrementalIndex.Spec(), config.getAppendableIndexSpec());
Assert.assertEquals(100, config.getMaxRowsInMemory());
Assert.assertEquals(100, config.getMaxRowsPerSegment().intValue());
Assert.assertEquals(new Period("PT1H"), config.getIntermediatePersistPeriod());
@@ -136,6 +141,7 @@ public class KinesisIndexTaskTuningConfigTest
public void testSerdeWithModifiedTuningConfigAddedField() throws IOException
{
KinesisIndexTaskTuningConfig base = new KinesisIndexTaskTuningConfig(
+ null,
1,
3L,
2,
@@ -168,6 +174,7 @@ public class KinesisIndexTaskTuningConfigTest
mapper.readValue(serialized, TestModifiedKinesisIndexTaskTuningConfig.class);
Assert.assertEquals(null, deserialized.getExtra());
+ Assert.assertEquals(base.getAppendableIndexSpec(), deserialized.getAppendableIndexSpec());
Assert.assertEquals(base.getMaxRowsInMemory(), deserialized.getMaxRowsInMemory());
Assert.assertEquals(base.getMaxBytesInMemory(), deserialized.getMaxBytesInMemory());
Assert.assertEquals(base.getMaxRowsPerSegment(), deserialized.getMaxRowsPerSegment());
@@ -195,6 +202,7 @@ public class KinesisIndexTaskTuningConfigTest
public void testSerdeWithModifiedTuningConfigRemovedField() throws IOException
{
KinesisIndexTaskTuningConfig base = new KinesisIndexTaskTuningConfig(
+ null,
1,
3L,
2,
@@ -226,6 +234,7 @@ public class KinesisIndexTaskTuningConfigTest
KinesisIndexTaskTuningConfig deserialized =
mapper.readValue(serialized, KinesisIndexTaskTuningConfig.class);
+ Assert.assertEquals(base.getAppendableIndexSpec(), deserialized.getAppendableIndexSpec());
Assert.assertEquals(base.getMaxRowsInMemory(), deserialized.getMaxRowsInMemory());
Assert.assertEquals(base.getMaxBytesInMemory(), deserialized.getMaxBytesInMemory());
Assert.assertEquals(base.getMaxRowsPerSegment(), deserialized.getMaxRowsPerSegment());
@@ -282,6 +291,7 @@ public class KinesisIndexTaskTuningConfigTest
public void testConvert()
{
KinesisSupervisorTuningConfig original = new KinesisSupervisorTuningConfig(
+ null,
1,
(long) 3,
2,
@@ -317,6 +327,7 @@ public class KinesisIndexTaskTuningConfigTest
);
KinesisIndexTaskTuningConfig copy = (KinesisIndexTaskTuningConfig) original.convertToTaskTuningConfig();
+ Assert.assertEquals(original.getAppendableIndexSpec(), copy.getAppendableIndexSpec());
Assert.assertEquals(1, copy.getMaxRowsInMemory());
Assert.assertEquals(3, copy.getMaxBytesInMemory());
Assert.assertEquals(2, copy.getMaxRowsPerSegment().intValue());
@@ -338,4 +349,12 @@ public class KinesisIndexTaskTuningConfigTest
Assert.assertEquals(100, copy.getMaxRecordsPerPoll());
Assert.assertEquals(new Period().withDays(Integer.MAX_VALUE), copy.getIntermediateHandoffPeriod());
}
+
+ @Test
+ public void testEqualsAndHashCode()
+ {
+ EqualsVerifier.forClass(KinesisIndexTaskTuningConfig.class)
+ .usingGetClass()
+ .verify();
+ }
}
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
index 1351662..3d57a6b 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
@@ -167,6 +167,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
supervisorRecordSupplier = createMock(KinesisRecordSupplier.class);
tuningConfig = new KinesisSupervisorTuningConfig(
+ null,
1000,
null,
50000,
@@ -3689,6 +3690,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
DataSchema modifiedDataSchema = getDataSchema("some other datasource");
KinesisSupervisorTuningConfig modifiedTuningConfig = new KinesisSupervisorTuningConfig(
+ null,
1000,
null,
50000,
@@ -4741,6 +4743,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
};
final KinesisSupervisorTuningConfig tuningConfig = new KinesisSupervisorTuningConfig(
+ null,
1000,
null,
50000,
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfigTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfigTest.java
index d503210..ff3edc4 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfigTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfigTest.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.indexing.kinesis.KinesisIndexingServiceModule;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.segment.indexing.TuningConfig;
import org.joda.time.Duration;
import org.joda.time.Period;
@@ -58,6 +59,7 @@ public class KinesisSupervisorTuningConfigTest
);
Assert.assertNotNull(config.getBasePersistDirectory());
+ Assert.assertEquals(new OnheapIncrementalIndex.Spec(), config.getAppendableIndexSpec());
Assert.assertEquals(1000000, config.getMaxRowsInMemory());
Assert.assertEquals(5_000_000, config.getMaxRowsPerSegment().intValue());
Assert.assertEquals(new Period("PT10M"), config.getIntermediatePersistPeriod());
@@ -92,7 +94,8 @@ public class KinesisSupervisorTuningConfigTest
+ " \"chatRetries\": 14,\n"
+ " \"httpTimeout\": \"PT15S\",\n"
+ " \"shutdownTimeout\": \"PT95S\",\n"
- + " \"repartitionTransitionDuration\": \"PT500S\"\n"
+ + " \"repartitionTransitionDuration\": \"PT500S\",\n"
+ + " \"appendableIndexSpec\": { \"type\" : \"onheap\" }\n"
+ "}";
KinesisSupervisorTuningConfig config = (KinesisSupervisorTuningConfig) mapper.readValue(
@@ -106,6 +109,7 @@ public class KinesisSupervisorTuningConfigTest
);
Assert.assertEquals(new File("/tmp/xxx"), config.getBasePersistDirectory());
+ Assert.assertEquals(new OnheapIncrementalIndex.Spec(), config.getAppendableIndexSpec());
Assert.assertEquals(100, config.getMaxRowsInMemory());
Assert.assertEquals(100, config.getMaxRowsPerSegment().intValue());
Assert.assertEquals(new Period("PT1H"), config.getIntermediatePersistPeriod());
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java
index e45168d..5b2e2bd 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.druid.indexing.kinesis.KinesisIndexTaskTuningConfig;
import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.joda.time.Period;
@@ -37,6 +38,7 @@ public class TestModifiedKinesisIndexTaskTuningConfig extends KinesisIndexTaskTu
@JsonCreator
public TestModifiedKinesisIndexTaskTuningConfig(
+ @JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec,
@JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
@JsonProperty("maxBytesInMemory") Long maxBytesInMemory,
@JsonProperty("maxRowsPerSegment") Integer maxRowsPerSegment,
@@ -66,6 +68,7 @@ public class TestModifiedKinesisIndexTaskTuningConfig extends KinesisIndexTaskTu
)
{
super(
+ appendableIndexSpec,
maxRowsInMemory,
maxBytesInMemory,
maxRowsPerSegment,
@@ -98,6 +101,7 @@ public class TestModifiedKinesisIndexTaskTuningConfig extends KinesisIndexTaskTu
public TestModifiedKinesisIndexTaskTuningConfig(KinesisIndexTaskTuningConfig base, String extra)
{
super(
+ base.getAppendableIndexSpec(),
base.getMaxRowsInMemory(),
base.getMaxBytesInMemory(),
base.getMaxRowsPerSegment(),
diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTuningConfig.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTuningConfig.java
index 2b80d28..f1a8cc8 100644
--- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTuningConfig.java
+++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTuningConfig.java
@@ -29,6 +29,7 @@ import org.apache.druid.indexer.partitions.DimensionBasedPartitionsSpec;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
import org.apache.druid.segment.indexing.TuningConfig;
import javax.annotation.Nullable;
@@ -56,6 +57,7 @@ public class HadoopTuningConfig implements TuningConfig
DEFAULT_SHARD_SPECS,
DEFAULT_INDEX_SPEC,
DEFAULT_INDEX_SPEC,
+ DEFAULT_APPENDABLE_INDEX,
DEFAULT_ROW_FLUSH_BOUNDARY,
0L,
false,
@@ -83,6 +85,7 @@ public class HadoopTuningConfig implements TuningConfig
private final Map<Long, List<HadoopyShardSpec>> shardSpecs;
private final IndexSpec indexSpec;
private final IndexSpec indexSpecForIntermediatePersists;
+ private final AppendableIndexSpec appendableIndexSpec;
private final int rowFlushBoundary;
private final long maxBytesInMemory;
private final boolean leaveIntermediate;
@@ -108,6 +111,7 @@ public class HadoopTuningConfig implements TuningConfig
final @JsonProperty("shardSpecs") @Nullable Map<Long, List<HadoopyShardSpec>> shardSpecs,
final @JsonProperty("indexSpec") @Nullable IndexSpec indexSpec,
final @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists,
+ final @JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec,
final @JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory,
final @JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory,
final @JsonProperty("leaveIntermediate") boolean leaveIntermediate,
@@ -140,8 +144,9 @@ public class HadoopTuningConfig implements TuningConfig
this.rowFlushBoundary = maxRowsInMemory == null ? maxRowsInMemoryCOMPAT == null
? DEFAULT_ROW_FLUSH_BOUNDARY
: maxRowsInMemoryCOMPAT : maxRowsInMemory;
+ this.appendableIndexSpec = appendableIndexSpec == null ? DEFAULT_APPENDABLE_INDEX : appendableIndexSpec;
// initializing this to 0, it will be lazily initialized to a value
- // @see server.src.main.java.org.apache.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long)
+ // @see #getMaxBytesInMemoryOrDefault()
this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory;
this.leaveIntermediate = leaveIntermediate;
this.cleanupOnFailure = cleanupOnFailure == null ? true : cleanupOnFailure;
@@ -211,6 +216,13 @@ public class HadoopTuningConfig implements TuningConfig
return indexSpecForIntermediatePersists;
}
+ @JsonProperty
+ @Override
+ public AppendableIndexSpec getAppendableIndexSpec()
+ {
+ return appendableIndexSpec;
+ }
+
@JsonProperty("maxRowsInMemory")
public int getRowFlushBoundary()
{
@@ -218,6 +230,7 @@ public class HadoopTuningConfig implements TuningConfig
}
@JsonProperty
+ @Override
public long getMaxBytesInMemory()
{
return maxBytesInMemory;
@@ -327,6 +340,7 @@ public class HadoopTuningConfig implements TuningConfig
shardSpecs,
indexSpec,
indexSpecForIntermediatePersists,
+ appendableIndexSpec,
rowFlushBoundary,
maxBytesInMemory,
leaveIntermediate,
@@ -357,6 +371,7 @@ public class HadoopTuningConfig implements TuningConfig
shardSpecs,
indexSpec,
indexSpecForIntermediatePersists,
+ appendableIndexSpec,
rowFlushBoundary,
maxBytesInMemory,
leaveIntermediate,
@@ -387,6 +402,7 @@ public class HadoopTuningConfig implements TuningConfig
specs,
indexSpec,
indexSpecForIntermediatePersists,
+ appendableIndexSpec,
rowFlushBoundary,
maxBytesInMemory,
leaveIntermediate,
diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java
index 91982fd..abc0b3b 100644
--- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java
+++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java
@@ -50,7 +50,6 @@ import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
-import org.apache.druid.segment.indexing.TuningConfigs;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.apache.druid.timeline.partition.ShardSpec;
@@ -302,11 +301,12 @@ public class IndexGeneratorJob implements Jobby
.withRollup(config.getSchema().getDataSchema().getGranularitySpec().isRollup())
.build();
- IncrementalIndex newIndex = new IncrementalIndex.Builder()
+ // Build the incremental-index according to the spec that was chosen by the user
+ IncrementalIndex newIndex = tuningConfig.getAppendableIndexSpec().builder()
.setIndexSchema(indexSchema)
.setMaxRowCount(tuningConfig.getRowFlushBoundary())
- .setMaxBytesInMemory(TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()))
- .buildOnheap();
+ .setMaxBytesInMemory(tuningConfig.getMaxBytesInMemoryOrDefault())
+ .build();
if (oldDimOrder != null && !indexSchema.getDimensionsSpec().hasCustomDimensions()) {
newIndex.loadDimensionIterable(oldDimOrder, oldCapabilities);
diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/BatchDeltaIngestionTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/BatchDeltaIngestionTest.java
index 4b8d72e..644ae24 100644
--- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/BatchDeltaIngestionTest.java
+++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/BatchDeltaIngestionTest.java
@@ -465,6 +465,7 @@ public class BatchDeltaIngestionTest
null,
null,
null,
+ null,
false,
false,
false,
diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/DetermineHashedPartitionsJobTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/DetermineHashedPartitionsJobTest.java
index 93eda94..a127d51 100644
--- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/DetermineHashedPartitionsJobTest.java
+++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/DetermineHashedPartitionsJobTest.java
@@ -219,6 +219,7 @@ public class DetermineHashedPartitionsJobTest
null,
null,
null,
+ null,
false,
false,
false,
diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/DeterminePartitionsJobTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/DeterminePartitionsJobTest.java
index 6906eaf..047c104 100644
--- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/DeterminePartitionsJobTest.java
+++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/DeterminePartitionsJobTest.java
@@ -328,6 +328,7 @@ public class DeterminePartitionsJobTest
null,
null,
null,
+ null,
false,
false,
false,
diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidIndexerConfigTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidIndexerConfigTest.java
index c9937e0..9eabd41 100644
--- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidIndexerConfigTest.java
+++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidIndexerConfigTest.java
@@ -241,6 +241,7 @@ public class HadoopDruidIndexerConfigTest
null,
null,
null,
+ null,
false,
false,
false,
diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopTuningConfigTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopTuningConfigTest.java
index c1a7e95..fce828b 100644
--- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopTuningConfigTest.java
+++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopTuningConfigTest.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.junit.Assert;
import org.junit.Test;
@@ -44,6 +45,7 @@ public class HadoopTuningConfigTest
null,
null,
null,
+ null,
100,
null,
true,
@@ -68,6 +70,7 @@ public class HadoopTuningConfigTest
Assert.assertEquals("/tmp/workingpath", actual.getWorkingPath());
Assert.assertEquals("version", actual.getVersion());
+ Assert.assertEquals(new OnheapIncrementalIndex.Spec(), actual.getAppendableIndexSpec());
Assert.assertNotNull(actual.getPartitionsSpec());
Assert.assertEquals(ImmutableMap.<Long, List<HadoopyShardSpec>>of(), actual.getShardSpecs());
Assert.assertEquals(new IndexSpec(), actual.getIndexSpec());
diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java
index cd93236..8d59554 100644
--- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java
+++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java
@@ -529,6 +529,7 @@ public class IndexGeneratorJobTest
null,
null,
null,
+ null,
maxRowsInMemory,
maxBytesInMemory,
true,
diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/JobHelperTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/JobHelperTest.java
index 1cdb9b2..3188762 100644
--- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/JobHelperTest.java
+++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/JobHelperTest.java
@@ -165,6 +165,7 @@ public class JobHelperTest
null,
null,
null,
+ null,
false,
false,
false,
diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/path/GranularityPathSpecTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/path/GranularityPathSpecTest.java
index 4da9ee4..68505be 100644
--- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/path/GranularityPathSpecTest.java
+++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/path/GranularityPathSpecTest.java
@@ -63,6 +63,7 @@ public class GranularityPathSpecTest
null,
null,
null,
+ null,
false,
false,
false,
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java
index eec9b98..1538e15 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java
@@ -26,6 +26,7 @@ import com.google.common.base.Preconditions;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
import org.apache.druid.segment.indexing.TuningConfig;
import org.apache.druid.segment.realtime.appenderator.AppenderatorConfig;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
@@ -37,9 +38,8 @@ import javax.annotation.Nullable;
import java.io.File;
@JsonTypeName("realtime_appenderator")
-public class RealtimeAppenderatorTuningConfig implements TuningConfig, AppenderatorConfig
+public class RealtimeAppenderatorTuningConfig implements AppenderatorConfig
{
- private static final int DEFAULT_MAX_ROWS_IN_MEMORY = TuningConfig.DEFAULT_MAX_ROWS_IN_MEMORY;
private static final Period DEFAULT_INTERMEDIATE_PERSIST_PERIOD = new Period("PT10M");
private static final int DEFAULT_MAX_PENDING_PERSISTS = 0;
private static final ShardSpec DEFAULT_SHARD_SPEC = new NumberedShardSpec(0, 1);
@@ -53,6 +53,7 @@ public class RealtimeAppenderatorTuningConfig implements TuningConfig, Appendera
return FileUtils.createTempDir("druid-realtime-persist");
}
+ private final AppendableIndexSpec appendableIndexSpec;
private final int maxRowsInMemory;
private final long maxBytesInMemory;
private final DynamicPartitionsSpec partitionsSpec;
@@ -74,6 +75,7 @@ public class RealtimeAppenderatorTuningConfig implements TuningConfig, Appendera
@JsonCreator
public RealtimeAppenderatorTuningConfig(
+ @JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec,
@JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
@JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory,
@JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment,
@@ -93,9 +95,10 @@ public class RealtimeAppenderatorTuningConfig implements TuningConfig, Appendera
@JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions
)
{
+ this.appendableIndexSpec = appendableIndexSpec == null ? DEFAULT_APPENDABLE_INDEX : appendableIndexSpec;
this.maxRowsInMemory = maxRowsInMemory == null ? DEFAULT_MAX_ROWS_IN_MEMORY : maxRowsInMemory;
- // initializing this to 0, it will be lazily intialized to a value
- // @see server.src.main.java.org.apache.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long)
+ // initializing this to 0, it will be lazily initialized to a value
+ // @see #getMaxBytesInMemoryOrDefault()
this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory;
this.partitionsSpec = new DynamicPartitionsSpec(maxRowsPerSegment, maxTotalRows);
this.intermediatePersistPeriod = intermediatePersistPeriod == null
@@ -137,6 +140,13 @@ public class RealtimeAppenderatorTuningConfig implements TuningConfig, Appendera
@Override
@JsonProperty
+ public AppendableIndexSpec getAppendableIndexSpec()
+ {
+ return appendableIndexSpec;
+ }
+
+ @Override
+ @JsonProperty
public int getMaxRowsInMemory()
{
return maxRowsInMemory;
@@ -260,6 +270,7 @@ public class RealtimeAppenderatorTuningConfig implements TuningConfig, Appendera
public RealtimeAppenderatorTuningConfig withBasePersistDirectory(File dir)
{
return new RealtimeAppenderatorTuningConfig(
+ appendableIndexSpec,
maxRowsInMemory,
maxBytesInMemory,
partitionsSpec.getMaxRowsPerSegment(),
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
index ba2502f..afaf0cf 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
@@ -219,6 +219,7 @@ public class CompactionTask extends AbstractBatchIndexTask
return new ParallelIndexTuningConfig(
null,
indexTuningConfig.getMaxRowsPerSegment(),
+ indexTuningConfig.getAppendableIndexSpec(),
indexTuningConfig.getMaxRowsPerSegment(),
indexTuningConfig.getMaxBytesInMemory(),
indexTuningConfig.getMaxTotalRows(),
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
index 16e03f1..f351a5d 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
@@ -74,6 +74,7 @@ import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
import org.apache.druid.segment.incremental.ParseExceptionHandler;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.indexing.BatchIOConfig;
@@ -1110,7 +1111,7 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
}
}
- public static class IndexTuningConfig implements TuningConfig, AppenderatorConfig
+ public static class IndexTuningConfig implements AppenderatorConfig
{
private static final IndexSpec DEFAULT_INDEX_SPEC = new IndexSpec();
private static final int DEFAULT_MAX_PENDING_PERSISTS = 0;
@@ -1118,6 +1119,7 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
private static final boolean DEFAULT_REPORT_PARSE_EXCEPTIONS = false;
private static final long DEFAULT_PUSH_TIMEOUT = 0;
+ private final AppendableIndexSpec appendableIndexSpec;
private final int maxRowsInMemory;
private final long maxBytesInMemory;
@@ -1189,6 +1191,7 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
public IndexTuningConfig(
@JsonProperty("targetPartitionSize") @Deprecated @Nullable Integer targetPartitionSize,
@JsonProperty("maxRowsPerSegment") @Deprecated @Nullable Integer maxRowsPerSegment,
+ @JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec,
@JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory,
@JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory,
@JsonProperty("maxTotalRows") @Deprecated @Nullable Long maxTotalRows,
@@ -1211,6 +1214,7 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
)
{
this(
+ appendableIndexSpec,
maxRowsInMemory != null ? maxRowsInMemory : rowFlushBoundary_forBackCompatibility,
maxBytesInMemory != null ? maxBytesInMemory : 0,
getPartitionsSpec(
@@ -1242,10 +1246,11 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
private IndexTuningConfig()
{
- this(null, null, null, null, null, null, null, null, null, null, null, null, null, null);
+ this(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null);
}
private IndexTuningConfig(
+ @Nullable AppendableIndexSpec appendableIndexSpec,
@Nullable Integer maxRowsInMemory,
@Nullable Long maxBytesInMemory,
@Nullable PartitionsSpec partitionsSpec,
@@ -1262,9 +1267,10 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
@Nullable Integer maxSavedParseExceptions
)
{
+ this.appendableIndexSpec = appendableIndexSpec == null ? DEFAULT_APPENDABLE_INDEX : appendableIndexSpec;
this.maxRowsInMemory = maxRowsInMemory == null ? TuningConfig.DEFAULT_MAX_ROWS_IN_MEMORY : maxRowsInMemory;
// initializing this to 0, it will be lazily initialized to a value
- // @see server.src.main.java.org.apache.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long)
+ // @see #getMaxBytesInMemoryOrDefault()
this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory;
this.partitionsSpec = partitionsSpec;
this.indexSpec = indexSpec == null ? DEFAULT_INDEX_SPEC : indexSpec;
@@ -1300,6 +1306,7 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
public IndexTuningConfig withBasePersistDirectory(File dir)
{
return new IndexTuningConfig(
+ appendableIndexSpec,
maxRowsInMemory,
maxBytesInMemory,
partitionsSpec,
@@ -1320,6 +1327,7 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
public IndexTuningConfig withPartitionsSpec(PartitionsSpec partitionsSpec)
{
return new IndexTuningConfig(
+ appendableIndexSpec,
maxRowsInMemory,
maxBytesInMemory,
partitionsSpec,
@@ -1339,6 +1347,13 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
@JsonProperty
@Override
+ public AppendableIndexSpec getAppendableIndexSpec()
+ {
+ return appendableIndexSpec;
+ }
+
+ @JsonProperty
+ @Override
public int getMaxRowsInMemory()
{
return maxRowsInMemory;
@@ -1514,7 +1529,8 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
return false;
}
IndexTuningConfig that = (IndexTuningConfig) o;
- return maxRowsInMemory == that.maxRowsInMemory &&
+ return Objects.equals(appendableIndexSpec, that.appendableIndexSpec) &&
+ maxRowsInMemory == that.maxRowsInMemory &&
maxBytesInMemory == that.maxBytesInMemory &&
maxPendingPersists == that.maxPendingPersists &&
forceGuaranteedRollup == that.forceGuaranteedRollup &&
@@ -1534,6 +1550,7 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
public int hashCode()
{
return Objects.hash(
+ appendableIndexSpec,
maxRowsInMemory,
maxBytesInMemory,
partitionsSpec,
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
index d5d8a8f..5cbfc01 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
@@ -929,6 +929,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
return new IndexTuningConfig(
null,
null,
+ tuningConfig.getAppendableIndexSpec(),
tuningConfig.getMaxRowsInMemory(),
tuningConfig.getMaxBytesInMemory(),
null,
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java
index 935eeb4..ca7fdf1 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java
@@ -28,6 +28,7 @@ import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.joda.time.Duration;
import org.joda.time.Period;
@@ -97,6 +98,7 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig
null,
null,
null,
+ null,
null
);
}
@@ -105,6 +107,7 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig
public ParallelIndexTuningConfig(
@JsonProperty("targetPartitionSize") @Deprecated @Nullable Integer targetPartitionSize,
@JsonProperty("maxRowsPerSegment") @Deprecated @Nullable Integer maxRowsPerSegment,
+ @JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec,
@JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory,
@JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory,
@JsonProperty("maxTotalRows") @Deprecated @Nullable Long maxTotalRows,
@@ -134,6 +137,7 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig
super(
targetPartitionSize,
maxRowsPerSegment,
+ appendableIndexSpec,
maxRowsInMemory,
maxBytesInMemory,
maxTotalRows,
@@ -248,6 +252,7 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig
return new ParallelIndexTuningConfig(
null,
null,
+ getAppendableIndexSpec(),
getMaxRowsInMemory(),
getMaxBytesInMemory(),
null,
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java
index c192703..b56d8e5 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java
@@ -22,6 +22,7 @@ package org.apache.druid.indexing.seekablestream;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
import org.apache.druid.segment.indexing.RealtimeTuningConfig;
import org.apache.druid.segment.indexing.TuningConfig;
import org.apache.druid.segment.realtime.appenderator.AppenderatorConfig;
@@ -32,11 +33,12 @@ import javax.annotation.Nullable;
import java.io.File;
import java.util.Objects;
-public abstract class SeekableStreamIndexTaskTuningConfig implements TuningConfig, AppenderatorConfig
+public abstract class SeekableStreamIndexTaskTuningConfig implements AppenderatorConfig
{
private static final boolean DEFAULT_RESET_OFFSET_AUTOMATICALLY = false;
private static final boolean DEFAULT_SKIP_SEQUENCE_NUMBER_AVAILABILITY_CHECK = false;
+ private final AppendableIndexSpec appendableIndexSpec;
private final int maxRowsInMemory;
private final long maxBytesInMemory;
private final DynamicPartitionsSpec partitionsSpec;
@@ -59,6 +61,7 @@ public abstract class SeekableStreamIndexTaskTuningConfig implements TuningConfi
private final int maxSavedParseExceptions;
public SeekableStreamIndexTaskTuningConfig(
+ @Nullable AppendableIndexSpec appendableIndexSpec,
@Nullable Integer maxRowsInMemory,
@Nullable Long maxBytesInMemory,
@Nullable Integer maxRowsPerSegment,
@@ -84,10 +87,11 @@ public abstract class SeekableStreamIndexTaskTuningConfig implements TuningConfi
// Cannot be a static because default basePersistDirectory is unique per-instance
final RealtimeTuningConfig defaults = RealtimeTuningConfig.makeDefaultTuningConfig(basePersistDirectory);
+ this.appendableIndexSpec = appendableIndexSpec == null ? DEFAULT_APPENDABLE_INDEX : appendableIndexSpec;
this.maxRowsInMemory = maxRowsInMemory == null ? defaults.getMaxRowsInMemory() : maxRowsInMemory;
this.partitionsSpec = new DynamicPartitionsSpec(maxRowsPerSegment, maxTotalRows);
// initializing this to 0, it will be lazily initialized to a value
- // @see server.src.main.java.org.apache.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long)
+ // @see #getMaxBytesInMemoryOrDefault()
this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory;
this.intermediatePersistPeriod = intermediatePersistPeriod == null
? defaults.getIntermediatePersistPeriod()
@@ -132,6 +136,13 @@ public abstract class SeekableStreamIndexTaskTuningConfig implements TuningConfi
@Override
@JsonProperty
+ public AppendableIndexSpec getAppendableIndexSpec()
+ {
+ return appendableIndexSpec;
+ }
+
+ @Override
+ @JsonProperty
public int getMaxRowsInMemory()
{
return maxRowsInMemory;
@@ -281,7 +292,8 @@ public abstract class SeekableStreamIndexTaskTuningConfig implements TuningConfi
return false;
}
SeekableStreamIndexTaskTuningConfig that = (SeekableStreamIndexTaskTuningConfig) o;
- return maxRowsInMemory == that.maxRowsInMemory &&
+ return Objects.equals(appendableIndexSpec, that.appendableIndexSpec) &&
+ maxRowsInMemory == that.maxRowsInMemory &&
maxBytesInMemory == that.maxBytesInMemory &&
maxPendingPersists == that.maxPendingPersists &&
reportParseExceptions == that.reportParseExceptions &&
@@ -304,6 +316,7 @@ public abstract class SeekableStreamIndexTaskTuningConfig implements TuningConfi
public int hashCode()
{
return Objects.hash(
+ appendableIndexSpec,
maxRowsInMemory,
maxBytesInMemory,
partitionsSpec,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
index a9d82b1..9a8ae20 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java
@@ -1394,6 +1394,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest extends InitializedNullHand
null
);
RealtimeAppenderatorTuningConfig tuningConfig = new RealtimeAppenderatorTuningConfig(
+ null,
1000,
null,
maxRowsPerSegment,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
index 0692c60..0daffc6 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
@@ -204,6 +204,7 @@ public class ClientCompactionTaskQuerySerdeTest
new ParallelIndexTuningConfig(
null,
null,
+ null,
40000,
2000L,
null,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
index 08cd765..1fce094 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
@@ -279,6 +279,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
null,
null,
null,
+ null,
new HashedPartitionsSpec(null, 3, null),
null,
null,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
index c82f1e7..14c0840 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
@@ -305,6 +305,7 @@ public class CompactionTaskTest
return new ParallelIndexTuningConfig(
null,
null, // null to compute maxRowsPerSegment automatically
+ null,
500000,
1000000L,
null,
@@ -438,6 +439,7 @@ public class CompactionTaskTest
new IndexTuningConfig(
null,
null, // null to compute maxRowsPerSegment automatically
+ null,
500000,
1000000L,
null,
@@ -583,6 +585,7 @@ public class CompactionTaskTest
final ParallelIndexTuningConfig tuningConfig = new ParallelIndexTuningConfig(
100000,
null,
+ null,
500000,
1000000L,
null,
@@ -650,6 +653,7 @@ public class CompactionTaskTest
final ParallelIndexTuningConfig tuningConfig = new ParallelIndexTuningConfig(
null,
null,
+ null,
500000,
1000000L,
1000000L,
@@ -717,6 +721,7 @@ public class CompactionTaskTest
final ParallelIndexTuningConfig tuningConfig = new ParallelIndexTuningConfig(
null,
null,
+ null,
500000,
1000000L,
null,
@@ -1105,6 +1110,7 @@ public class CompactionTaskTest
new ParallelIndexTuningConfig(
null,
null,
+ null,
500000,
1000000L,
Long.MAX_VALUE,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskSerdeTest.java
index 23f5748..bf2e444 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskSerdeTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskSerdeTest.java
@@ -59,6 +59,7 @@ public class IndexTaskSerdeTest
final IndexTuningConfig tuningConfig = new IndexTuningConfig(
null,
null,
+ null,
100,
2000L,
null,
@@ -92,6 +93,7 @@ public class IndexTaskSerdeTest
final IndexTuningConfig tuningConfig = new IndexTuningConfig(
null,
null,
+ null,
100,
2000L,
null,
@@ -125,6 +127,7 @@ public class IndexTaskSerdeTest
final IndexTuningConfig tuningConfig = new IndexTuningConfig(
null,
1000,
+ null,
100,
2000L,
3000L,
@@ -158,6 +161,7 @@ public class IndexTaskSerdeTest
final IndexTuningConfig tuningConfig = new IndexTuningConfig(
null,
null,
+ null,
100,
2000L,
null,
@@ -193,6 +197,7 @@ public class IndexTaskSerdeTest
final IndexTuningConfig tuningConfig = new IndexTuningConfig(
null,
null,
+ null,
100,
2000L,
null,
@@ -227,6 +232,7 @@ public class IndexTaskSerdeTest
final IndexTuningConfig tuningConfig = new IndexTuningConfig(
null,
null,
+ null,
100,
2000L,
null,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
index fdd3096..0d4810a 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
@@ -26,6 +26,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import com.google.common.io.Files;
+import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.impl.CSVParseSpec;
import org.apache.druid.data.input.impl.CsvInputFormat;
@@ -1122,6 +1123,7 @@ public class IndexTaskTest extends IngestionTestBase
null,
null,
null,
+ null,
new HashedPartitionsSpec(2, null, null),
INDEX_SPEC,
null,
@@ -1250,6 +1252,7 @@ public class IndexTaskTest extends IngestionTestBase
null,
null,
null,
+ null,
new DynamicPartitionsSpec(2, null),
INDEX_SPEC,
null,
@@ -1370,6 +1373,7 @@ public class IndexTaskTest extends IngestionTestBase
null,
null,
null,
+ null,
new HashedPartitionsSpec(2, null, null),
INDEX_SPEC,
null,
@@ -1808,6 +1812,7 @@ public class IndexTaskTest extends IngestionTestBase
return new IndexTuningConfig(
null,
maxRowsPerSegment,
+ null,
maxRowsInMemory,
maxBytesInMemory,
maxTotalRows,
@@ -1998,4 +2003,12 @@ public class IndexTaskTest extends IngestionTestBase
);
}
}
+
+ @Test
+ public void testEqualsAndHashCode()
+ {
+ EqualsVerifier.forClass(IndexTuningConfig.class)
+ .usingGetClass()
+ .verify();
+ }
}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java
index b87dcf7..2038441 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java
@@ -830,6 +830,7 @@ public class RealtimeIndexTaskTest extends InitializedNullHandlingTest
null
);
RealtimeTuningConfig realtimeTuningConfig = new RealtimeTuningConfig(
+ null,
1000,
null,
new Period("P1Y"),
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java
index 9dfbd4e..aa3d33b 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java
@@ -249,6 +249,7 @@ public class TaskSerdeTest
new IndexTuningConfig(
null,
null,
+ null,
10,
null,
null,
@@ -329,6 +330,7 @@ public class TaskSerdeTest
new IndexTuningConfig(
null,
null,
+ null,
10,
null,
null,
@@ -396,6 +398,7 @@ public class TaskSerdeTest
),
new RealtimeTuningConfig(
+ null,
1,
10L,
new Period("PT10M"),
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
index d87d1f0..45b0f38 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
@@ -157,6 +157,7 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
null,
null,
null,
+ null,
2,
null,
null,
@@ -227,6 +228,7 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
null,
null,
null,
+ null,
new MaxSizeSplitHintSpec(null, 1),
partitionsSpec,
null,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java
index 37fb265..fc193b0 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java
@@ -178,6 +178,7 @@ public class ParallelIndexSupervisorTaskKillTest extends AbstractParallelIndexSu
null,
null,
null,
+ null,
numTotalSubTasks,
null,
null,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java
index 0a30bf7..be6b0a8 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java
@@ -427,6 +427,7 @@ public class ParallelIndexSupervisorTaskResourceTest extends AbstractParallelInd
null,
null,
null,
+ null,
NUM_SUB_TASKS,
null,
null,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java
index e6b144f..a80e3a8 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java
@@ -271,6 +271,7 @@ public class ParallelIndexSupervisorTaskSerdeTest
null,
null,
null,
+ null,
partitionsSpec,
null,
null,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java
index 710df1d..6088235 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java
@@ -195,6 +195,7 @@ public class ParallelIndexSupervisorTaskTest
final ParallelIndexTuningConfig tuningConfig = new ParallelIndexTuningConfig(
null,
null,
+ null,
10,
1000L,
null,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTestingFactory.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTestingFactory.java
index 931fd24..c2f7123 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTestingFactory.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTestingFactory.java
@@ -158,6 +158,7 @@ class ParallelIndexTestingFactory
return new ParallelIndexTuningConfig(
1,
null,
+ null,
3,
4L,
5L,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfigTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfigTest.java
index 101c974..a826ae9 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfigTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfigTest.java
@@ -21,6 +21,7 @@ package org.apache.druid.indexing.common.task.batch.parallel;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
+import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
@@ -69,6 +70,7 @@ public class ParallelIndexTuningConfigTest
final ParallelIndexTuningConfig tuningConfig = new ParallelIndexTuningConfig(
null,
null,
+ null,
10,
1000L,
null,
@@ -111,6 +113,7 @@ public class ParallelIndexTuningConfigTest
final ParallelIndexTuningConfig tuningConfig = new ParallelIndexTuningConfig(
null,
null,
+ null,
10,
1000L,
null,
@@ -153,6 +156,7 @@ public class ParallelIndexTuningConfigTest
final ParallelIndexTuningConfig tuningConfig = new ParallelIndexTuningConfig(
null,
null,
+ null,
10,
1000L,
null,
@@ -197,6 +201,7 @@ public class ParallelIndexTuningConfigTest
final ParallelIndexTuningConfig tuningConfig = new ParallelIndexTuningConfig(
null,
null,
+ null,
10,
1000L,
null,
@@ -238,6 +243,7 @@ public class ParallelIndexTuningConfigTest
new ParallelIndexTuningConfig(
null,
null,
+ null,
10,
1000L,
null,
@@ -279,6 +285,7 @@ public class ParallelIndexTuningConfigTest
new ParallelIndexTuningConfig(
null,
null,
+ null,
10,
1000L,
null,
@@ -320,6 +327,7 @@ public class ParallelIndexTuningConfigTest
new ParallelIndexTuningConfig(
null,
null,
+ null,
10,
1000L,
null,
@@ -351,4 +359,12 @@ public class ParallelIndexTuningConfigTest
null
);
}
+
+ @Test
+ public void testEqualsAndHashCode()
+ {
+ EqualsVerifier.forClass(ParallelIndexTuningConfig.class)
+ .usingGetClass()
+ .verify();
+ }
}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
index b7f107d..e444f80 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
@@ -352,6 +352,7 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv
null,
null,
null,
+ null,
1,
null,
null,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
index 99ab9a4..48f75d7 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
@@ -746,6 +746,7 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
new IndexTuningConfig(
null,
10000,
+ null,
10,
null,
null,
@@ -826,6 +827,7 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
new IndexTuningConfig(
null,
10000,
+ null,
10,
null,
null,
@@ -1251,6 +1253,7 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
new IndexTuningConfig(
null,
10000,
+ null,
10,
null,
null,
@@ -1358,6 +1361,7 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
new IndexTuningConfig(
null,
10000,
+ null,
10,
null,
null,
@@ -1468,6 +1472,7 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
// PlumberSchool - Realtime Index Task always uses RealtimePlumber which is hardcoded in RealtimeIndexTask class
);
RealtimeTuningConfig realtimeTuningConfig = new RealtimeTuningConfig(
+ null,
1000,
null,
new Period("P1Y"),
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
index 6bfbd7f..2cefebd 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
@@ -898,6 +898,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
null,
null,
null,
+ null,
null
)
{
diff --git a/server/src/main/java/org/apache/druid/segment/indexing/TuningConfig.java b/processing/src/main/java/org/apache/druid/jackson/AppendableIndexModule.java
similarity index 53%
copy from server/src/main/java/org/apache/druid/segment/indexing/TuningConfig.java
copy to processing/src/main/java/org/apache/druid/jackson/AppendableIndexModule.java
index 6d02ca4..177c7a5 100644
--- a/server/src/main/java/org/apache/druid/segment/indexing/TuningConfig.java
+++ b/processing/src/main/java/org/apache/druid/jackson/AppendableIndexModule.java
@@ -17,26 +17,28 @@
* under the License.
*/
-package org.apache.druid.segment.indexing;
+package org.apache.druid.jackson;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
-import org.apache.druid.utils.JvmUtils;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
-/**
- */
-@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
-@JsonSubTypes(value = {
- @JsonSubTypes.Type(name = "realtime", value = RealtimeTuningConfig.class)
-})
-public interface TuningConfig
+public class AppendableIndexModule extends SimpleModule
{
- boolean DEFAULT_LOG_PARSE_EXCEPTIONS = false;
- int DEFAULT_MAX_PARSE_EXCEPTIONS = Integer.MAX_VALUE;
- int DEFAULT_MAX_SAVED_PARSE_EXCEPTIONS = 0;
- int DEFAULT_MAX_ROWS_IN_MEMORY = 1_000_000;
- // We initially estimated this to be 1/3(max jvm memory), but bytesCurrentlyInMemory only
- // tracks active index and not the index being flushed to disk, to account for that
- // we halved default to 1/6(max jvm memory)
- long DEFAULT_MAX_BYTES_IN_MEMORY = JvmUtils.getRuntimeInfo().getMaxHeapSizeBytes() / 6;
+ public AppendableIndexModule()
+ {
+ super("AppendableIndexFactories");
+
+ setMixInAnnotation(AppendableIndexSpec.class, AppendableIndexSpecMixin.class);
+ }
+
+ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = OnheapIncrementalIndex.Spec.class)
+ @JsonSubTypes(value = {
+ @JsonSubTypes.Type(name = OnheapIncrementalIndex.Spec.TYPE, value = OnheapIncrementalIndex.Spec.class),
+ })
+ public interface AppendableIndexSpecMixin
+ {
+ }
}
diff --git a/processing/src/main/java/org/apache/druid/jackson/DefaultObjectMapper.java b/processing/src/main/java/org/apache/druid/jackson/DefaultObjectMapper.java
index 235d467..71cf30f 100644
--- a/processing/src/main/java/org/apache/druid/jackson/DefaultObjectMapper.java
+++ b/processing/src/main/java/org/apache/druid/jackson/DefaultObjectMapper.java
@@ -49,6 +49,7 @@ public class DefaultObjectMapper extends ObjectMapper
registerModule(new AggregatorsModule());
registerModule(new StringComparatorModule());
registerModule(new SegmentizerModule());
+ registerModule(new AppendableIndexModule());
configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
configure(MapperFeature.AUTO_DETECT_GETTERS, false);
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryHelper.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryHelper.java
index 877fd34..f0b2a9e 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryHelper.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryHelper.java
@@ -39,9 +39,12 @@ import org.apache.druid.query.ResourceLimitExceededException;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.dimension.DimensionSpec;
+import org.apache.druid.segment.incremental.AppendableIndexBuilder;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.incremental.IndexSizeExceededException;
+import org.apache.druid.segment.incremental.OffheapIncrementalIndex;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
@@ -117,24 +120,24 @@ public class GroupByQueryHelper
.withMinTimestamp(granTimeStart.getMillis())
.build();
+
+ final AppendableIndexBuilder indexBuilder;
+
if (query.getContextValue("useOffheap", false)) {
- index = new IncrementalIndex.Builder()
- .setIndexSchema(indexSchema)
- .setDeserializeComplexMetrics(false)
- .setConcurrentEventAdd(true)
- .setSortFacts(sortResults)
- .setMaxRowCount(querySpecificConfig.getMaxResults())
- .buildOffheap(bufferPool);
+ indexBuilder = new OffheapIncrementalIndex.Builder()
+ .setBufferPool(bufferPool);
} else {
- index = new IncrementalIndex.Builder()
- .setIndexSchema(indexSchema)
- .setDeserializeComplexMetrics(false)
- .setConcurrentEventAdd(true)
- .setSortFacts(sortResults)
- .setMaxRowCount(querySpecificConfig.getMaxResults())
- .buildOnheap();
+ indexBuilder = new OnheapIncrementalIndex.Builder();
}
+ index = indexBuilder
+ .setIndexSchema(indexSchema)
+ .setDeserializeComplexMetrics(false)
+ .setConcurrentEventAdd(true)
+ .setSortFacts(sortResults)
+ .setMaxRowCount(querySpecificConfig.getMaxResults())
+ .build();
+
Accumulator<IncrementalIndex, T> accumulator = new Accumulator<IncrementalIndex, T>()
{
@Override
diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexBuilder.java b/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexBuilder.java
new file mode 100644
index 0000000..220f0e3
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexBuilder.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.incremental;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+
+import javax.annotation.Nullable;
+
+public abstract class AppendableIndexBuilder
+{
+ @Nullable
+ protected IncrementalIndexSchema incrementalIndexSchema = null;
+ protected boolean deserializeComplexMetrics = true;
+ protected boolean concurrentEventAdd = false;
+ protected boolean sortFacts = true;
+ protected int maxRowCount = 0;
+ protected long maxBytesInMemory = 0;
+
+ protected final Logger log = new Logger(this.getClass().getName());
+
+ public AppendableIndexBuilder setIndexSchema(final IncrementalIndexSchema incrementalIndexSchema)
+ {
+ this.incrementalIndexSchema = incrementalIndexSchema;
+ return this;
+ }
+
+ /**
+ * A helper method to set a simple index schema with only metrics and default values for the other parameters. Note
+ * that this method is normally used for testing and benchmarking; it is unlikely that you would use it in
+ * production settings.
+ *
+ * @param metrics variable array of {@link AggregatorFactory} metrics
+ *
+ * @return this
+ */
+ @VisibleForTesting
+ public AppendableIndexBuilder setSimpleTestingIndexSchema(final AggregatorFactory... metrics)
+ {
+ return setSimpleTestingIndexSchema(null, metrics);
+ }
+
+
+ /**
+ * A helper method to set a simple index schema with controllable metrics and rollup, and default values for the
+ * other parameters. Note that this method is normally used for testing and benchmarking; it is unlikely that you
+ * would use it in production settings.
+ *
+ * @param metrics variable array of {@link AggregatorFactory} metrics
+ *
+ * @return this
+ */
+ @VisibleForTesting
+ public AppendableIndexBuilder setSimpleTestingIndexSchema(@Nullable Boolean rollup, final AggregatorFactory... metrics)
+ {
+ IncrementalIndexSchema.Builder builder = new IncrementalIndexSchema.Builder().withMetrics(metrics);
+ this.incrementalIndexSchema = rollup != null ? builder.withRollup(rollup).build() : builder.build();
+ return this;
+ }
+
+ public AppendableIndexBuilder setDeserializeComplexMetrics(final boolean deserializeComplexMetrics)
+ {
+ this.deserializeComplexMetrics = deserializeComplexMetrics;
+ return this;
+ }
+
+ public AppendableIndexBuilder setConcurrentEventAdd(final boolean concurrentEventAdd)
+ {
+ this.concurrentEventAdd = concurrentEventAdd;
+ return this;
+ }
+
+ public AppendableIndexBuilder setSortFacts(final boolean sortFacts)
+ {
+ this.sortFacts = sortFacts;
+ return this;
+ }
+
+ public AppendableIndexBuilder setMaxRowCount(final int maxRowCount)
+ {
+ this.maxRowCount = maxRowCount;
+ return this;
+ }
+
+ public AppendableIndexBuilder setMaxBytesInMemory(final long maxBytesInMemory)
+ {
+ this.maxBytesInMemory = maxBytesInMemory;
+ return this;
+ }
+
+ public void validate()
+ {
+ if (maxRowCount <= 0) {
+ throw new IllegalArgumentException("Invalid max row count: " + maxRowCount);
+ }
+
+ if (incrementalIndexSchema == null) {
+ throw new IllegalArgumentException("incrementIndexSchema cannot be null");
+ }
+ }
+
+ public final IncrementalIndex build()
+ {
+ log.debug("Building appendable index.");
+ validate();
+ return buildInner();
+ }
+
+ protected abstract IncrementalIndex buildInner();
+}
diff --git a/server/src/main/java/org/apache/druid/segment/indexing/TuningConfigs.java b/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexSpec.java
similarity index 53%
rename from server/src/main/java/org/apache/druid/segment/indexing/TuningConfigs.java
rename to processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexSpec.java
index 9c470ab..67cdabd 100644
--- a/server/src/main/java/org/apache/druid/segment/indexing/TuningConfigs.java
+++ b/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexSpec.java
@@ -17,25 +17,19 @@
* under the License.
*/
-package org.apache.druid.segment.indexing;
+package org.apache.druid.segment.incremental;
-public class TuningConfigs
+import org.apache.druid.guice.annotations.UnstableApi;
+
+/**
+ * AppendableIndexSpec describes the in-memory indexing method for data ingestion.
+ */
+@UnstableApi
+public interface AppendableIndexSpec
{
- private TuningConfigs()
- {
- }
+ // Returns a builder of the appendable index.
+ AppendableIndexBuilder builder();
- public static long getMaxBytesInMemoryOrDefault(final long maxBytesInMemory)
- {
- // In the main tuningConfig class constructor, we set the maxBytes to 0 if null to avoid setting
- // maxBytes to max jvm memory of the process that starts first. Instead we set the default based on
- // the actual task node's jvm memory.
- long newMaxBytesInMemory = maxBytesInMemory;
- if (maxBytesInMemory == 0) {
- newMaxBytesInMemory = TuningConfig.DEFAULT_MAX_BYTES_IN_MEMORY;
- } else if (maxBytesInMemory < 0) {
- newMaxBytesInMemory = Long.MAX_VALUE;
- }
- return newMaxBytesInMemory;
- }
+ // Returns the default max bytes in memory for this index.
+ long getDefaultMaxBytesInMemory();
}
diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
index a718b53..cc338b9 100644
--- a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
+++ b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java
@@ -31,7 +31,6 @@ import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
import com.google.errorprone.annotations.concurrent.GuardedBy;
-import org.apache.druid.collections.NonBlockingPool;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.MapBasedRow;
@@ -79,7 +78,6 @@ import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.Closeable;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
@@ -88,7 +86,6 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
@@ -319,126 +316,62 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
}
}
- public static class Builder
+ /**
+ * This class exists only as backward competability to reduce the number of modified lines.
+ */
+ public static class Builder extends OnheapIncrementalIndex.Builder
{
- @Nullable
- private IncrementalIndexSchema incrementalIndexSchema;
- private boolean deserializeComplexMetrics;
- private boolean concurrentEventAdd;
- private boolean sortFacts;
- private int maxRowCount;
- private long maxBytesInMemory;
-
- public Builder()
- {
- incrementalIndexSchema = null;
- deserializeComplexMetrics = true;
- concurrentEventAdd = false;
- sortFacts = true;
- maxRowCount = 0;
- maxBytesInMemory = 0;
- }
-
+ @Override
public Builder setIndexSchema(final IncrementalIndexSchema incrementalIndexSchema)
{
- this.incrementalIndexSchema = incrementalIndexSchema;
- return this;
+ return (Builder) super.setIndexSchema(incrementalIndexSchema);
}
- /**
- * A helper method to set a simple index schema with only metrics and default values for the other parameters. Note
- * that this method is normally used for testing and benchmarking; it is unlikely that you would use it in
- * production settings.
- *
- * @param metrics variable array of {@link AggregatorFactory} metrics
- *
- * @return this
- */
- @VisibleForTesting
+ @Override
public Builder setSimpleTestingIndexSchema(final AggregatorFactory... metrics)
{
- return setSimpleTestingIndexSchema(null, metrics);
+ return (Builder) super.setSimpleTestingIndexSchema(metrics);
}
-
- /**
- * A helper method to set a simple index schema with controllable metrics and rollup, and default values for the
- * other parameters. Note that this method is normally used for testing and benchmarking; it is unlikely that you
- * would use it in production settings.
- *
- * @param metrics variable array of {@link AggregatorFactory} metrics
- *
- * @return this
- */
- @VisibleForTesting
+ @Override
public Builder setSimpleTestingIndexSchema(@Nullable Boolean rollup, final AggregatorFactory... metrics)
{
- IncrementalIndexSchema.Builder builder = new IncrementalIndexSchema.Builder().withMetrics(metrics);
- this.incrementalIndexSchema = rollup != null ? builder.withRollup(rollup).build() : builder.build();
- return this;
+ return (Builder) super.setSimpleTestingIndexSchema(rollup, metrics);
}
+ @Override
public Builder setDeserializeComplexMetrics(final boolean deserializeComplexMetrics)
{
- this.deserializeComplexMetrics = deserializeComplexMetrics;
- return this;
+ return (Builder) super.setDeserializeComplexMetrics(deserializeComplexMetrics);
}
+ @Override
public Builder setConcurrentEventAdd(final boolean concurrentEventAdd)
{
- this.concurrentEventAdd = concurrentEventAdd;
- return this;
+ return (Builder) super.setConcurrentEventAdd(concurrentEventAdd);
}
+ @Override
public Builder setSortFacts(final boolean sortFacts)
{
- this.sortFacts = sortFacts;
- return this;
+ return (Builder) super.setSortFacts(sortFacts);
}
+ @Override
public Builder setMaxRowCount(final int maxRowCount)
{
- this.maxRowCount = maxRowCount;
- return this;
+ return (Builder) super.setMaxRowCount(maxRowCount);
}
- //maxBytesInMemory only applies to OnHeapIncrementalIndex
+ @Override
public Builder setMaxBytesInMemory(final long maxBytesInMemory)
{
- this.maxBytesInMemory = maxBytesInMemory;
- return this;
+ return (Builder) super.setMaxBytesInMemory(maxBytesInMemory);
}
public OnheapIncrementalIndex buildOnheap()
{
- if (maxRowCount <= 0) {
- throw new IllegalArgumentException("Invalid max row count: " + maxRowCount);
- }
-
- return new OnheapIncrementalIndex(
- Objects.requireNonNull(incrementalIndexSchema, "incrementIndexSchema is null"),
- deserializeComplexMetrics,
- concurrentEventAdd,
- sortFacts,
- maxRowCount,
- maxBytesInMemory
- );
- }
-
- public IncrementalIndex buildOffheap(final NonBlockingPool<ByteBuffer> bufferPool)
- {
- if (maxRowCount <= 0) {
- throw new IllegalArgumentException("Invalid max row count: " + maxRowCount);
- }
-
- return new OffheapIncrementalIndex(
- Objects.requireNonNull(incrementalIndexSchema, "incrementalIndexSchema is null"),
- deserializeComplexMetrics,
- concurrentEventAdd,
- sortFacts,
- maxRowCount,
- Objects.requireNonNull(bufferPool, "bufferPool is null")
- );
+ return (OnheapIncrementalIndex) build();
}
}
diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/OffheapIncrementalIndex.java b/processing/src/main/java/org/apache/druid/segment/incremental/OffheapIncrementalIndex.java
index 97da994..b3cdabc 100644
--- a/processing/src/main/java/org/apache/druid/segment/incremental/OffheapIncrementalIndex.java
+++ b/processing/src/main/java/org/apache/druid/segment/incremental/OffheapIncrementalIndex.java
@@ -40,6 +40,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
/**
@@ -115,6 +116,8 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
selectors = new HashMap<>();
aggOffsetInBuffer = new int[metrics.length];
+ int aggsCurOffsetInBuffer = 0;
+
for (int i = 0; i < metrics.length; i++) {
AggregatorFactory agg = metrics[i];
@@ -129,15 +132,11 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
new OnheapIncrementalIndex.CachingColumnSelectorFactory(columnSelectorFactory, concurrentEventAdd)
);
- if (i == 0) {
- aggOffsetInBuffer[i] = 0;
- } else {
- aggOffsetInBuffer[i] = aggOffsetInBuffer[i - 1] + metrics[i - 1].getMaxIntermediateSizeWithNulls();
- }
+ aggOffsetInBuffer[i] = aggsCurOffsetInBuffer;
+ aggsCurOffsetInBuffer += agg.getMaxIntermediateSizeWithNulls();
}
- aggsTotalSize = aggOffsetInBuffer[metrics.length - 1] + metrics[metrics.length
- - 1].getMaxIntermediateSizeWithNulls();
+ aggsTotalSize = aggsCurOffsetInBuffer;
return new BufferAggregator[metrics.length];
}
@@ -346,4 +345,38 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
}
aggBuffers.clear();
}
+
+ public static class Builder extends AppendableIndexBuilder
+ {
+ @Nullable
+ NonBlockingPool<ByteBuffer> bufferPool = null;
+
+ public Builder setBufferPool(final NonBlockingPool<ByteBuffer> bufferPool)
+ {
+ this.bufferPool = bufferPool;
+ return this;
+ }
+
+ @Override
+ public void validate()
+ {
+ super.validate();
+ if (bufferPool == null) {
+ throw new IllegalArgumentException("bufferPool cannot be null");
+ }
+ }
+
+ @Override
+ protected OffheapIncrementalIndex buildInner()
+ {
+ return new OffheapIncrementalIndex(
+ Objects.requireNonNull(incrementalIndexSchema, "incrementalIndexSchema is null"),
+ deserializeComplexMetrics,
+ concurrentEventAdd,
+ sortFacts,
+ maxRowCount,
+ Objects.requireNonNull(bufferPool, "bufferPool is null")
+ );
+ }
+ }
}
diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
index 9bbdf60..a72124a 100644
--- a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
+++ b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java
@@ -32,6 +32,7 @@ import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.utils.JvmUtils;
import javax.annotation.Nullable;
import java.io.IOException;
@@ -40,6 +41,7 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -434,4 +436,51 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
}
}
+ public static class Builder extends AppendableIndexBuilder
+ {
+ @Override
+ protected OnheapIncrementalIndex buildInner()
+ {
+ return new OnheapIncrementalIndex(
+ Objects.requireNonNull(incrementalIndexSchema, "incrementIndexSchema is null"),
+ deserializeComplexMetrics,
+ concurrentEventAdd,
+ sortFacts,
+ maxRowCount,
+ maxBytesInMemory
+ );
+ }
+ }
+
+ public static class Spec implements AppendableIndexSpec
+ {
+ public static final String TYPE = "onheap";
+
+ @Override
+ public AppendableIndexBuilder builder()
+ {
+ return new Builder();
+ }
+
+ @Override
+ public long getDefaultMaxBytesInMemory()
+ {
+ // We initially estimated this to be 1/3(max jvm memory), but bytesCurrentlyInMemory only
+ // tracks active index and not the index being flushed to disk, to account for that
+ // we halved default to 1/6(max jvm memory)
+ return JvmUtils.getRuntimeInfo().getMaxHeapSizeBytes() / 6;
+ }
+
+ @Override
+ public boolean equals(Object that)
+ {
+ return that.getClass().equals(this.getClass());
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(this.getClass());
+ }
+ }
}
diff --git a/processing/src/test/java/org/apache/druid/segment/data/IncrementalIndexTest.java b/processing/src/test/java/org/apache/druid/segment/data/IncrementalIndexTest.java
index dfc3e53..534034b 100644
--- a/processing/src/test/java/org/apache/druid/segment/data/IncrementalIndexTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/data/IncrementalIndexTest.java
@@ -63,9 +63,10 @@ import org.apache.druid.segment.CloserRule;
import org.apache.druid.segment.IncrementalIndexSegment;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.incremental.IncrementalIndex;
-import org.apache.druid.segment.incremental.IncrementalIndex.Builder;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.incremental.IndexSizeExceededException;
+import org.apache.druid.segment.incremental.OffheapIncrementalIndex;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.joda.time.Interval;
import org.junit.AfterClass;
@@ -130,10 +131,11 @@ public class IncrementalIndexTest extends InitializedNullHandlingTest
RESOURCE_CLOSER.register(pool1);
params.add(
new Object[] {
- (IndexCreator) factories -> new Builder()
+ (IndexCreator) factories -> new OffheapIncrementalIndex.Builder()
+ .setBufferPool(pool1)
.setSimpleTestingIndexSchema(factories)
.setMaxRowCount(1000000)
- .buildOffheap(pool1)
+ .build()
}
);
params.add(new Object[] {(IndexCreator) IncrementalIndexTest::createNoRollupIndex});
@@ -144,7 +146,8 @@ public class IncrementalIndexTest extends InitializedNullHandlingTest
RESOURCE_CLOSER.register(pool2);
params.add(
new Object[] {
- (IndexCreator) factories -> new Builder()
+ (IndexCreator) factories -> new OffheapIncrementalIndex.Builder()
+ .setBufferPool(pool2)
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withMetrics(factories)
@@ -152,7 +155,7 @@ public class IncrementalIndexTest extends InitializedNullHandlingTest
.build()
)
.setMaxRowCount(1000000)
- .buildOffheap(pool2)
+ .build()
}
);
@@ -173,7 +176,7 @@ public class IncrementalIndexTest extends InitializedNullHandlingTest
aggregatorFactories = DEFAULT_AGGREGATOR_FACTORIES;
}
- return new IncrementalIndex.Builder()
+ return new OnheapIncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withDimensionsSpec(dimensionsSpec)
@@ -181,7 +184,7 @@ public class IncrementalIndexTest extends InitializedNullHandlingTest
.build()
)
.setMaxRowCount(1000000)
- .buildOnheap();
+ .build();
}
public static IncrementalIndex createIndex(AggregatorFactory[] aggregatorFactories)
@@ -190,10 +193,10 @@ public class IncrementalIndexTest extends InitializedNullHandlingTest
aggregatorFactories = DEFAULT_AGGREGATOR_FACTORIES;
}
- return new IncrementalIndex.Builder()
+ return new OnheapIncrementalIndex.Builder()
.setSimpleTestingIndexSchema(aggregatorFactories)
.setMaxRowCount(1000000)
- .buildOnheap();
+ .build();
}
public static IncrementalIndex createNoRollupIndex(AggregatorFactory[] aggregatorFactories)
@@ -202,10 +205,10 @@ public class IncrementalIndexTest extends InitializedNullHandlingTest
aggregatorFactories = DEFAULT_AGGREGATOR_FACTORIES;
}
- return new IncrementalIndex.Builder()
+ return new OnheapIncrementalIndex.Builder()
.setSimpleTestingIndexSchema(false, aggregatorFactories)
.setMaxRowCount(1000000)
- .buildOnheap();
+ .build();
}
public static void populateIndex(long timestamp, IncrementalIndex index) throws IndexSizeExceededException
@@ -722,7 +725,7 @@ public class IncrementalIndexTest extends InitializedNullHandlingTest
@Test
public void testgetDimensions()
{
- final IncrementalIndex<Aggregator> incrementalIndex = new IncrementalIndex.Builder()
+ final IncrementalIndex<Aggregator> incrementalIndex = (OnheapIncrementalIndex) new OnheapIncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withMetrics(new CountAggregatorFactory("count"))
@@ -736,7 +739,7 @@ public class IncrementalIndexTest extends InitializedNullHandlingTest
.build()
)
.setMaxRowCount(1000000)
- .buildOnheap();
+ .build();
closerRule.closeLater(incrementalIndex);
Assert.assertEquals(Arrays.asList("dim0", "dim1"), incrementalIndex.getDimensionNames());
@@ -745,10 +748,10 @@ public class IncrementalIndexTest extends InitializedNullHandlingTest
@Test
public void testDynamicSchemaRollup() throws IndexSizeExceededException
{
- IncrementalIndex<Aggregator> index = new IncrementalIndex.Builder()
+ IncrementalIndex<Aggregator> index = (OnheapIncrementalIndex) new OnheapIncrementalIndex.Builder()
.setSimpleTestingIndexSchema(/* empty */)
.setMaxRowCount(10)
- .buildOnheap();
+ .build();
closerRule.closeLater(index);
index.add(
new MapBasedInputRow(
diff --git a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexTest.java b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexTest.java
index 91863ff..c7b885a 100644
--- a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexTest.java
@@ -117,12 +117,12 @@ public class IncrementalIndexTest extends InitializedNullHandlingTest
@Override
public IncrementalIndex createIndex()
{
- return new IncrementalIndex.Builder()
+ return new OnheapIncrementalIndex.Builder()
.setIndexSchema(schema)
.setDeserializeComplexMetrics(false)
.setSortFacts(sortFacts)
.setMaxRowCount(1000)
- .buildOnheap();
+ .build();
}
},
Closer.create()
@@ -141,11 +141,12 @@ public class IncrementalIndexTest extends InitializedNullHandlingTest
@Override
public IncrementalIndex createIndex()
{
- return new IncrementalIndex.Builder()
+ return new OffheapIncrementalIndex.Builder()
+ .setBufferPool(stupidPool)
.setIndexSchema(schema)
.setSortFacts(sortFacts)
.setMaxRowCount(1000000)
- .buildOffheap(stupidPool);
+ .build();
}
},
poolCloser
diff --git a/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java b/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java
index 728e2ff..48fb592 100644
--- a/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java
+++ b/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java
@@ -25,6 +25,7 @@ import com.google.common.base.Preconditions;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
import org.apache.druid.segment.realtime.appenderator.AppenderatorConfig;
import org.apache.druid.segment.realtime.plumber.IntervalStartVersioningPolicy;
import org.apache.druid.segment.realtime.plumber.RejectionPolicyFactory;
@@ -41,9 +42,8 @@ import java.io.File;
/**
*
*/
-public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig
+public class RealtimeTuningConfig implements AppenderatorConfig
{
- private static final int DEFAULT_MAX_ROWS_IN_MEMORY = TuningConfig.DEFAULT_MAX_ROWS_IN_MEMORY;
private static final Period DEFAULT_INTERMEDIATE_PERSIST_PERIOD = new Period("PT10M");
private static final Period DEFAULT_WINDOW_PERIOD = new Period("PT10M");
private static final VersioningPolicy DEFAULT_VERSIONING_POLICY = new IntervalStartVersioningPolicy();
@@ -65,6 +65,7 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig
public static RealtimeTuningConfig makeDefaultTuningConfig(final @Nullable File basePersistDirectory)
{
return new RealtimeTuningConfig(
+ DEFAULT_APPENDABLE_INDEX,
DEFAULT_MAX_ROWS_IN_MEMORY,
0L,
DEFAULT_INTERMEDIATE_PERSIST_PERIOD,
@@ -87,6 +88,7 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig
);
}
+ private final AppendableIndexSpec appendableIndexSpec;
private final int maxRowsInMemory;
private final long maxBytesInMemory;
private final Period intermediatePersistPeriod;
@@ -110,6 +112,7 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig
@JsonCreator
public RealtimeTuningConfig(
+ @JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec,
@JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
@JsonProperty("maxBytesInMemory") Long maxBytesInMemory,
@JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod,
@@ -132,9 +135,10 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig
@JsonProperty("dedupColumn") @Nullable String dedupColumn
)
{
+ this.appendableIndexSpec = appendableIndexSpec == null ? DEFAULT_APPENDABLE_INDEX : appendableIndexSpec;
this.maxRowsInMemory = maxRowsInMemory == null ? DEFAULT_MAX_ROWS_IN_MEMORY : maxRowsInMemory;
// initializing this to 0, it will be lazily initialized to a value
- // @see server.src.main.java.org.apache.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long)
+ // @see #getMaxBytesInMemoryOrDefault()
this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory;
this.intermediatePersistPeriod = intermediatePersistPeriod == null
? DEFAULT_INTERMEDIATE_PERSIST_PERIOD
@@ -168,6 +172,13 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig
@Override
@JsonProperty
+ public AppendableIndexSpec getAppendableIndexSpec()
+ {
+ return appendableIndexSpec;
+ }
+
+ @Override
+ @JsonProperty
public int getMaxRowsInMemory()
{
return maxRowsInMemory;
@@ -304,6 +315,7 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig
public RealtimeTuningConfig withVersioningPolicy(VersioningPolicy policy)
{
return new RealtimeTuningConfig(
+ appendableIndexSpec,
maxRowsInMemory,
maxBytesInMemory,
intermediatePersistPeriod,
@@ -330,6 +342,7 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig
public RealtimeTuningConfig withBasePersistDirectory(File dir)
{
return new RealtimeTuningConfig(
+ appendableIndexSpec,
maxRowsInMemory,
maxBytesInMemory,
intermediatePersistPeriod,
diff --git a/server/src/main/java/org/apache/druid/segment/indexing/TuningConfig.java b/server/src/main/java/org/apache/druid/segment/indexing/TuningConfig.java
index 6d02ca4..e3a4e1f 100644
--- a/server/src/main/java/org/apache/druid/segment/indexing/TuningConfig.java
+++ b/server/src/main/java/org/apache/druid/segment/indexing/TuningConfig.java
@@ -21,7 +21,8 @@ package org.apache.druid.segment.indexing;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
-import org.apache.druid.utils.JvmUtils;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
/**
*/
@@ -32,11 +33,37 @@ import org.apache.druid.utils.JvmUtils;
public interface TuningConfig
{
boolean DEFAULT_LOG_PARSE_EXCEPTIONS = false;
+ AppendableIndexSpec DEFAULT_APPENDABLE_INDEX = new OnheapIncrementalIndex.Spec();
int DEFAULT_MAX_PARSE_EXCEPTIONS = Integer.MAX_VALUE;
int DEFAULT_MAX_SAVED_PARSE_EXCEPTIONS = 0;
int DEFAULT_MAX_ROWS_IN_MEMORY = 1_000_000;
- // We initially estimated this to be 1/3(max jvm memory), but bytesCurrentlyInMemory only
- // tracks active index and not the index being flushed to disk, to account for that
- // we halved default to 1/6(max jvm memory)
- long DEFAULT_MAX_BYTES_IN_MEMORY = JvmUtils.getRuntimeInfo().getMaxHeapSizeBytes() / 6;
+
+ /**
+ * The incremental index implementation to use
+ */
+ AppendableIndexSpec getAppendableIndexSpec();
+
+ /**
+ * Maximum number of bytes (estimated) to store in memory before persisting to local storage
+ */
+ long getMaxBytesInMemory();
+
+ /**
+ * Maximum number of bytes (estimated) to store in memory before persisting to local storage.
+ * If getMaxBytesInMemory() returns 0, the appendable index default will be used.
+ */
+ default long getMaxBytesInMemoryOrDefault()
+ {
+ // In the main tuningConfig class constructor, we set the maxBytes to 0 if null to avoid setting
+ // maxBytes to max jvm memory of the process that starts first. Instead we set the default based on
+ // the actual task node's jvm memory.
+ final long maxBytesInMemory = getMaxBytesInMemory();
+ if (maxBytesInMemory > 0) {
+ return maxBytesInMemory;
+ } else if (maxBytesInMemory == 0) {
+ return getAppendableIndexSpec().getDefaultMaxBytesInMemory();
+ } else {
+ return Long.MAX_VALUE;
+ }
+ }
}
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java
index f8d09e0..259e58c 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java
@@ -21,13 +21,14 @@ package org.apache.druid.segment.realtime.appenderator;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.indexing.TuningConfig;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.joda.time.Period;
import javax.annotation.Nullable;
import java.io.File;
-public interface AppenderatorConfig
+public interface AppenderatorConfig extends TuningConfig
{
boolean isReportParseExceptions();
@@ -36,11 +37,6 @@ public interface AppenderatorConfig
*/
int getMaxRowsInMemory();
- /**
- * Maximum number of bytes (estimated) to store in memory before persisting to local storage
- */
- long getMaxBytesInMemory();
-
int getMaxPendingPersists();
/**
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java
index c65c2ad..c288837 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java
@@ -64,7 +64,6 @@ import org.apache.druid.segment.incremental.IndexSizeExceededException;
import org.apache.druid.segment.incremental.ParseExceptionHandler;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.indexing.DataSchema;
-import org.apache.druid.segment.indexing.TuningConfigs;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.segment.realtime.FireHydrant;
@@ -199,7 +198,7 @@ public class AppenderatorImpl implements Appenderator
this.sinkTimeline = sinkQuerySegmentWalker.getSinkTimeline();
}
- maxBytesTuningConfig = TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory());
+ maxBytesTuningConfig = tuningConfig.getMaxBytesInMemoryOrDefault();
}
@Override
@@ -404,6 +403,7 @@ public class AppenderatorImpl implements Appenderator
schema,
identifier.getShardSpec(),
identifier.getVersion(),
+ tuningConfig.getAppendableIndexSpec(),
tuningConfig.getMaxRowsInMemory(),
maxBytesTuningConfig,
null
@@ -1121,6 +1121,7 @@ public class AppenderatorImpl implements Appenderator
schema,
identifier.getShardSpec(),
identifier.getVersion(),
+ tuningConfig.getAppendableIndexSpec(),
tuningConfig.getMaxRowsInMemory(),
maxBytesTuningConfig,
null,
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
index 7fae74c..02f75c7 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
@@ -49,6 +49,7 @@ import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.IndexableAdapter;
import org.apache.druid.segment.ProgressIndicator;
import org.apache.druid.segment.QueryableIndex;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.ParseExceptionHandler;
import org.apache.druid.segment.incremental.RowIngestionMeters;
@@ -383,6 +384,12 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager
}
@Override
+ public AppendableIndexSpec getAppendableIndexSpec()
+ {
+ return baseConfig.getAppendableIndexSpec();
+ }
+
+ @Override
public int getMaxRowsInMemory()
{
return Integer.MAX_VALUE; // unlimited, rely on maxBytesInMemory instead
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java
index bae600c..2aec758 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java
@@ -61,7 +61,6 @@ import org.apache.druid.segment.incremental.IncrementalIndexAddResult;
import org.apache.druid.segment.incremental.IndexSizeExceededException;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.RealtimeTuningConfig;
-import org.apache.druid.segment.indexing.TuningConfigs;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
@@ -260,8 +259,9 @@ public class RealtimePlumber implements Plumber
schema,
config.getShardSpec(),
versioningPolicy.getVersion(sinkInterval),
+ config.getAppendableIndexSpec(),
config.getMaxRowsInMemory(),
- TuningConfigs.getMaxBytesInMemoryOrDefault(config.getMaxBytesInMemory()),
+ config.getMaxBytesInMemoryOrDefault(),
config.getDedupColumn()
);
addSink(retVal);
@@ -723,8 +723,9 @@ public class RealtimePlumber implements Plumber
schema,
config.getShardSpec(),
versioningPolicy.getVersion(sinkInterval),
+ config.getAppendableIndexSpec(),
config.getMaxRowsInMemory(),
- TuningConfigs.getMaxBytesInMemoryOrDefault(config.getMaxBytesInMemory()),
+ config.getMaxBytesInMemoryOrDefault(),
config.getDedupColumn(),
hydrants
);
diff --git a/server/src/main/java/org/apache/druid/segment/realtime/plumber/Sink.java b/server/src/main/java/org/apache/druid/segment/realtime/plumber/Sink.java
index d6cb223..ccf5e4c 100644
--- a/server/src/main/java/org/apache/druid/segment/realtime/plumber/Sink.java
+++ b/server/src/main/java/org/apache/druid/segment/realtime/plumber/Sink.java
@@ -30,6 +30,7 @@ import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexAddResult;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
@@ -63,6 +64,7 @@ public class Sink implements Iterable<FireHydrant>, Overshadowable<Sink>
private final DataSchema schema;
private final ShardSpec shardSpec;
private final String version;
+ private final AppendableIndexSpec appendableIndexSpec;
private final int maxRowsInMemory;
private final long maxBytesInMemory;
private final CopyOnWriteArrayList<FireHydrant> hydrants = new CopyOnWriteArrayList<>();
@@ -79,6 +81,7 @@ public class Sink implements Iterable<FireHydrant>, Overshadowable<Sink>
DataSchema schema,
ShardSpec shardSpec,
String version,
+ AppendableIndexSpec appendableIndexSpec,
int maxRowsInMemory,
long maxBytesInMemory,
String dedupColumn
@@ -89,6 +92,7 @@ public class Sink implements Iterable<FireHydrant>, Overshadowable<Sink>
schema,
shardSpec,
version,
+ appendableIndexSpec,
maxRowsInMemory,
maxBytesInMemory,
dedupColumn,
@@ -101,6 +105,7 @@ public class Sink implements Iterable<FireHydrant>, Overshadowable<Sink>
DataSchema schema,
ShardSpec shardSpec,
String version,
+ AppendableIndexSpec appendableIndexSpec,
int maxRowsInMemory,
long maxBytesInMemory,
String dedupColumn,
@@ -111,6 +116,7 @@ public class Sink implements Iterable<FireHydrant>, Overshadowable<Sink>
this.shardSpec = shardSpec;
this.interval = interval;
this.version = version;
+ this.appendableIndexSpec = appendableIndexSpec;
this.maxRowsInMemory = maxRowsInMemory;
this.maxBytesInMemory = maxBytesInMemory;
this.dedupColumn = dedupColumn;
@@ -322,11 +328,13 @@ public class Sink implements Iterable<FireHydrant>, Overshadowable<Sink>
.withMetrics(schema.getAggregators())
.withRollup(schema.getGranularitySpec().isRollup())
.build();
- final IncrementalIndex newIndex = new IncrementalIndex.Builder()
+
+ // Build the incremental-index according to the spec that was chosen by the user
+ final IncrementalIndex newIndex = appendableIndexSpec.builder()
.setIndexSchema(indexSchema)
.setMaxRowCount(maxRowsInMemory)
.setMaxBytesInMemory(maxBytesInMemory)
- .buildOnheap();
+ .build();
final FireHydrant old;
synchronized (hydrantLock) {
diff --git a/server/src/test/java/org/apache/druid/segment/indexing/RealtimeTuningConfigTest.java b/server/src/test/java/org/apache/druid/segment/indexing/RealtimeTuningConfigTest.java
index ea3f7cf..72f0b8b 100644
--- a/server/src/test/java/org/apache/druid/segment/indexing/RealtimeTuningConfigTest.java
+++ b/server/src/test/java/org/apache/druid/segment/indexing/RealtimeTuningConfigTest.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.data.CompressionStrategy;
+import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.hamcrest.CoreMatchers;
import org.joda.time.Period;
@@ -89,6 +90,7 @@ public class RealtimeTuningConfigTest
);
Assert.assertNotNull(config.getBasePersistDirectory());
+ Assert.assertEquals(new OnheapIncrementalIndex.Spec(), config.getAppendableIndexSpec());
Assert.assertEquals(0, config.getHandoffConditionTimeout());
Assert.assertEquals(0, config.getAlertTimeout());
Assert.assertEquals(new IndexSpec(), config.getIndexSpec());
@@ -119,7 +121,8 @@ public class RealtimeTuningConfigTest
+ " \"handoffConditionTimeout\": 100,\n"
+ " \"alertTimeout\": 70,\n"
+ " \"indexSpec\": { \"metricCompression\" : \"NONE\" },\n"
- + " \"indexSpecForIntermediatePersists\": { \"dimensionCompression\" : \"uncompressed\" }\n"
+ + " \"indexSpecForIntermediatePersists\": { \"dimensionCompression\" : \"uncompressed\" },\n"
+ + " \"appendableIndexSpec\": { \"type\" : \"onheap\" }\n"
+ "}";
ObjectMapper mapper = TestHelper.makeJsonMapper();
@@ -134,6 +137,7 @@ public class RealtimeTuningConfigTest
);
Assert.assertEquals("/tmp/xxx", config.getBasePersistDirectory().toString());
+ Assert.assertEquals(new OnheapIncrementalIndex.Spec(), config.getAppendableIndexSpec());
Assert.assertEquals(100, config.getHandoffConditionTimeout());
Assert.assertEquals(70, config.getAlertTimeout());
Assert.assertEquals(new Period("PT1H"), config.getIntermediatePersistPeriod());
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java
index ee79dc1..87df0e8 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java
@@ -62,6 +62,7 @@ public class AppenderatorPlumberTest
EasyMock.anyObject())).andReturn(true).anyTimes();
RealtimeTuningConfig tuningConfig = new RealtimeTuningConfig(
+ null,
1,
null,
null,
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTester.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTester.java
index 9e57d28..c77d429 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTester.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTester.java
@@ -162,6 +162,7 @@ public class AppenderatorTester implements AutoCloseable
objectMapper
);
tuningConfig = new RealtimeTuningConfig(
+ null,
maxRowsInMemory,
maxSizeInBytes == 0L ? getDefaultMaxBytesInMemory() : maxSizeInBytes,
null,
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java
index 9e9d350..6e38c1c 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java
@@ -134,6 +134,7 @@ public class DefaultOfflineAppenderatorFactoryTest
);
RealtimeTuningConfig tuningConfig = new RealtimeTuningConfig(
+ null,
75000,
null,
null,
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java b/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java
index 0eb62e8..9b73cbe 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java
@@ -48,7 +48,6 @@ import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.RealtimeTuningConfig;
-import org.apache.druid.segment.indexing.TuningConfigs;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.loading.DataSegmentPusher;
@@ -200,6 +199,7 @@ public class RealtimePlumberSchoolTest extends InitializedNullHandlingTest
EasyMock.replay(announcer, segmentPublisher, dataSegmentPusher, handoffNotifierFactory, handoffNotifier, emitter);
tuningConfig = new RealtimeTuningConfig(
+ null,
1,
null,
null,
@@ -278,8 +278,9 @@ public class RealtimePlumberSchoolTest extends InitializedNullHandlingTest
schema,
tuningConfig.getShardSpec(),
DateTimes.of("2014-12-01T12:34:56.789").toString(),
+ tuningConfig.getAppendableIndexSpec(),
tuningConfig.getMaxRowsInMemory(),
- TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()),
+ tuningConfig.getMaxBytesInMemoryOrDefault(),
tuningConfig.getDedupColumn()
);
plumber.getSinks().put(0L, sink);
@@ -323,8 +324,9 @@ public class RealtimePlumberSchoolTest extends InitializedNullHandlingTest
schema,
tuningConfig.getShardSpec(),
DateTimes.of("2014-12-01T12:34:56.789").toString(),
+ tuningConfig.getAppendableIndexSpec(),
tuningConfig.getMaxRowsInMemory(),
- TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()),
+ tuningConfig.getMaxBytesInMemoryOrDefault(),
tuningConfig.getDedupColumn()
);
plumber.getSinks().put(0L, sink);
@@ -373,8 +375,9 @@ public class RealtimePlumberSchoolTest extends InitializedNullHandlingTest
schema2,
tuningConfig.getShardSpec(),
DateTimes.of("2014-12-01T12:34:56.789").toString(),
+ tuningConfig.getAppendableIndexSpec(),
tuningConfig.getMaxRowsInMemory(),
- TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()),
+ tuningConfig.getMaxBytesInMemoryOrDefault(),
tuningConfig.getDedupColumn()
);
plumber2.getSinks().put(0L, sink);
diff --git a/server/src/test/java/org/apache/druid/segment/realtime/plumber/SinkTest.java b/server/src/test/java/org/apache/druid/segment/realtime/plumber/SinkTest.java
index bf94b2f..477c3bd 100644
--- a/server/src/test/java/org/apache/druid/segment/realtime/plumber/SinkTest.java
+++ b/server/src/test/java/org/apache/druid/segment/realtime/plumber/SinkTest.java
@@ -34,7 +34,6 @@ import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.RealtimeTuningConfig;
-import org.apache.druid.segment.indexing.TuningConfigs;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.realtime.FireHydrant;
import org.apache.druid.testing.InitializedNullHandlingTest;
@@ -66,6 +65,7 @@ public class SinkTest extends InitializedNullHandlingTest
final Interval interval = Intervals.of("2013-01-01/2013-01-02");
final String version = DateTimes.nowUtc().toString();
RealtimeTuningConfig tuningConfig = new RealtimeTuningConfig(
+ null,
100,
null,
new Period("P1Y"),
@@ -91,8 +91,9 @@ public class SinkTest extends InitializedNullHandlingTest
schema,
tuningConfig.getShardSpec(),
version,
+ tuningConfig.getAppendableIndexSpec(),
tuningConfig.getMaxRowsInMemory(),
- TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()),
+ tuningConfig.getMaxBytesInMemoryOrDefault(),
tuningConfig.getDedupColumn()
);
@@ -220,6 +221,7 @@ public class SinkTest extends InitializedNullHandlingTest
final Interval interval = Intervals.of("2013-01-01/2013-01-02");
final String version = DateTimes.nowUtc().toString();
RealtimeTuningConfig tuningConfig = new RealtimeTuningConfig(
+ null,
100,
null,
new Period("P1Y"),
@@ -245,8 +247,9 @@ public class SinkTest extends InitializedNullHandlingTest
schema,
tuningConfig.getShardSpec(),
version,
+ tuningConfig.getAppendableIndexSpec(),
tuningConfig.getMaxRowsInMemory(),
- TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()),
+ tuningConfig.getMaxBytesInMemoryOrDefault(),
tuningConfig.getDedupColumn()
);
diff --git a/services/src/test/java/org/apache/druid/cli/validate/DruidJsonValidatorTest.java b/services/src/test/java/org/apache/druid/cli/validate/DruidJsonValidatorTest.java
index bb759b6..8dec453 100644
--- a/services/src/test/java/org/apache/druid/cli/validate/DruidJsonValidatorTest.java
+++ b/services/src/test/java/org/apache/druid/cli/validate/DruidJsonValidatorTest.java
@@ -156,6 +156,7 @@ public class DruidJsonValidatorTest
),
new RealtimeTuningConfig(
+ null,
1,
null,
new Period("PT10M"),
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org