You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by pj...@apache.org on 2021/03/06 09:07:25 UTC
[druid] branch master updated: Dynamic auto scale Kafka-Stream
ingest tasks (#10524)
This is an automated email from the ASF dual-hosted git repository.
pjain1 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new bddacbb Dynamic auto scale Kafka-Stream ingest tasks (#10524)
bddacbb is described below
commit bddacbb1c3abccf6ad035a4756a6960761fd43a2
Author: zhangyue19921010 <69...@users.noreply.github.com>
AuthorDate: Sat Mar 6 17:06:52 2021 +0800
Dynamic auto scale Kafka-Stream ingest tasks (#10524)
* druid task auto scale based on kafka lag
* fix kafkaSupervisorIOConfig and KinesisSupervisorIOConfig
* druid task auto scale based on kafka lag
* fix kafkaSupervisorIOConfig and KinesisSupervisorIOConfig
* test dynamic auto scale done
* auto scale tasks tested on prd cluster
* auto scale tasks tested on prd cluster
* modify code style to solve 29055.10 29055.9 29055.17 29055.18 29055.19 29055.20
* rename test fiel function
* change codes and add docs based on capistrant reviewed
* midify test docs
* modify docs
* modify docs
* modify docs
* merge from master
* Extract the autoScale logic out of SeekableStreamSupervisor to minimize putting more stuff inside there && Make autoscaling algorithm configurable and scalable.
* fix ci failed
* revert msic.xml
* add uts to test autoscaler create && scale out/in and kafka ingest with scale enable
* add more uts
* fix inner class check
* add IT for kafka ingestion with autoscaler
* add new IT in groups=kafka-index named testKafkaIndexDataWithWithAutoscaler
* review change
* code review
* remove unused imports
* fix NLP
* fix docs and UTs
* revert misc.xml
* use jackson to build autoScaleConfig with default values
* add uts
* use jackson to init AutoScalerConfig in IOConfig instead of Map<>
* autoscalerConfig interface and provide a defaultAutoScalerConfig
* modify uts
* modify docs
* fix checkstyle
* revert misc.xml
* modify uts
* reviewed code change
* reviewed code change
* code reviewed
* code review
* log changed
* do StringUtils.encodeForFormat when create allocationExec
* code review && limit taskCountMax to partitionNumbers
* modify docs
* code review
Co-authored-by: yuezhang <yu...@freewheel.tv>
---
.../development/extensions-core/kafka-ingestion.md | 109 +++
.../MaterializedViewSupervisor.java | 13 +
.../MaterializedViewSupervisorSpecTest.java | 82 ++
.../indexing/kafka/supervisor/KafkaSupervisor.java | 13 +
.../kafka/supervisor/KafkaSupervisorIOConfig.java | 5 +
.../druid/indexing/kafka/KafkaSamplerSpecTest.java | 1 +
.../kafka/supervisor/KafkaSupervisorTest.java | 199 +++++
.../kinesis/supervisor/KinesisSupervisor.java | 8 +
.../supervisor/KinesisSupervisorIOConfig.java | 12 +
.../indexing/kinesis/KinesisSamplerSpecTest.java | 1 +
.../kinesis/supervisor/KinesisSupervisorTest.java | 73 ++
indexing-service/pom.xml | 6 +-
.../overlord/supervisor/SupervisorManager.java | 31 +
.../supervisor/SeekableStreamSupervisor.java | 210 ++++-
.../SeekableStreamSupervisorIOConfig.java | 26 +-
.../supervisor/SeekableStreamSupervisorSpec.java | 18 +
.../supervisor/autoscaler/AutoScalerConfig.java | 43 +
.../supervisor/autoscaler/LagBasedAutoScaler.java | 244 ++++++
.../autoscaler/LagBasedAutoScalerConfig.java | 208 +++++
.../supervisor/autoscaler/NoopTaskAutoScaler.java | 47 +
.../OverlordSecurityResourceFilterTest.java | 8 +
.../supervisor/SupervisorResourceTest.java | 7 +
.../SeekableStreamSupervisorSpecTest.java | 950 +++++++++++++++++++++
.../SeekableStreamSupervisorStateTest.java | 39 +-
.../tests/indexer/AbstractStreamIndexingTest.java | 68 ++
...ingServiceNonTransactionalParallelizedTest.java | 10 +
.../supervisor_with_autoscaler_spec_template.json | 73 ++
pom.xml | 5 +
.../overlord/supervisor/NoopSupervisorSpec.java | 13 +
.../indexing/overlord/supervisor/Supervisor.java | 9 +
.../overlord/supervisor/SupervisorSpec.java | 6 +
.../overlord/supervisor/autoscaler/LagStats.java | 49 ++
.../autoscaler/SupervisorTaskAutoScaler.java | 27 +
.../druid/indexing/NoopSupervisorSpecTest.java | 67 ++
.../metadata/SQLMetadataSupervisorManagerTest.java | 7 +
.../apache/druid/metadata/TestSupervisorSpec.java | 7 +
36 files changed, 2671 insertions(+), 23 deletions(-)
diff --git a/docs/development/extensions-core/kafka-ingestion.md b/docs/development/extensions-core/kafka-ingestion.md
index 7b94a99..0dd3167 100644
--- a/docs/development/extensions-core/kafka-ingestion.md
+++ b/docs/development/extensions-core/kafka-ingestion.md
@@ -146,6 +146,115 @@ A sample supervisor spec is shown below:
|`lateMessageRejectionStartDateTime`|ISO8601 DateTime|Configure tasks to reject messages with timestamps earlier than this date time; for example if this is set to `2016-01-01T11:00Z` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* will be dropped. This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime and a [...]
|`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps earlier than this period before the task was created; for example if this is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* will be dropped. This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime an [...]
|`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps later than this period after the task reached its taskDuration; for example if this is set to `PT1H`, the taskDuration is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks sometimes run past their task duration, for example, in cases of supervisor failover. Setting earlyMessageReject [...]
+|`autoScalerConfig`|Object|`autoScalerConfig` to specify how to auto scale the number of Kafka ingest tasks. ONLY supported for Kafka indexing as of now. See [Tasks Autoscaler Properties](#Task Autoscaler Properties) for details.|no (default == null)|
+
+### Task Autoscaler Properties
+
+> Note that Task AutoScaler is currently designated as experimental.
+
+| Property | Description | Required |
+| ------------- | ------------- | ------------- |
+| `enableTaskAutoScaler` | Whether enable this feature or not. Set false or ignored here will disable `autoScaler` even though `autoScalerConfig` is not null| no (default == false) |
+| `taskCountMax` | Maximum value of task count. Make Sure `taskCountMax >= taskCountMin`. If `taskCountMax > {numKafkaPartitions}`, the maximum number of reading tasks would be equal to `{numKafkaPartitions}` and `taskCountMax` would be ignored. | yes |
+| `taskCountMin` | Minimum value of task count. When enable autoscaler, the value of taskCount in `IOConfig` will be ignored, and `taskCountMin` will be the number of tasks that ingestion starts going up to `taskCountMax`| yes |
+| `minTriggerScaleActionFrequencyMillis` | Minimum time interval between two scale actions | no (default == 600000) |
+| `autoScalerStrategy` | The algorithm of `autoScaler`. ONLY `lagBased` is supported for now. See [Lag Based AutoScaler Strategy Related Properties](#Lag Based AutoScaler Strategy Related Properties) for details.| no (default == `lagBased`) |
+
+### Lag Based AutoScaler Strategy Related Properties
+| Property | Description | Required |
+| ------------- | ------------- | ------------- |
+| `lagCollectionIntervalMillis` | Period of lag points collection. | no (default == 30000) |
+| `lagCollectionRangeMillis` | The total time window of lag collection, Use with `lagCollectionIntervalMillis`,it means that in the recent `lagCollectionRangeMillis`, collect lag metric points every `lagCollectionIntervalMillis`. | no (default == 600000) |
+| `scaleOutThreshold` | The Threshold of scale out action | no (default == 6000000) |
+| `triggerScaleOutFractionThreshold` | If `triggerScaleOutFractionThreshold` percent of lag points are higher than `scaleOutThreshold`, then do scale out action. | no (default == 0.3) |
+| `scaleInThreshold` | The Threshold of scale in action | no (default == 1000000) |
+| `triggerScaleInFractionThreshold` | If `triggerScaleInFractionThreshold` percent of lag points are lower than `scaleOutThreshold`, then do scale in action. | no (default == 0.9) |
+| `scaleActionStartDelayMillis` | Number of milliseconds after supervisor starts when first check scale logic. | no (default == 300000) |
+| `scaleActionPeriodMillis` | The frequency of checking whether to do scale action in millis | no (default == 60000) |
+| `scaleInStep` | How many tasks to reduce at a time | no (default == 1) |
+| `scaleOutStep` | How many tasks to add at a time | no (default == 2) |
+
+A sample supervisor spec with `lagBased` autoScaler enabled is shown below:
+```json
+{
+ "type": "kafka",
+ "dataSchema": {
+ "dataSource": "metrics-kafka",
+ "timestampSpec": {
+ "column": "timestamp",
+ "format": "auto"
+ },
+ "dimensionsSpec": {
+ "dimensions": [
+
+ ],
+ "dimensionExclusions": [
+ "timestamp",
+ "value"
+ ]
+ },
+ "metricsSpec": [
+ {
+ "name": "count",
+ "type": "count"
+ },
+ {
+ "name": "value_sum",
+ "fieldName": "value",
+ "type": "doubleSum"
+ },
+ {
+ "name": "value_min",
+ "fieldName": "value",
+ "type": "doubleMin"
+ },
+ {
+ "name": "value_max",
+ "fieldName": "value",
+ "type": "doubleMax"
+ }
+ ],
+ "granularitySpec": {
+ "type": "uniform",
+ "segmentGranularity": "HOUR",
+ "queryGranularity": "NONE"
+ }
+ },
+ "ioConfig": {
+ "topic": "metrics",
+ "inputFormat": {
+ "type": "json"
+ },
+ "consumerProperties": {
+ "bootstrap.servers": "localhost:9092"
+ },
+ "autoScalerConfig": {
+ "enableTaskAutoScaler": true,
+ "taskCountMax": 6,
+ "taskCountMin": 2,
+ "minTriggerScaleActionFrequencyMillis": 600000,
+ "autoScalerStrategy": "lagBased",
+ "lagCollectionIntervalMillis": 30000,
+ "lagCollectionRangeMillis": 600000,
+ "scaleOutThreshold": 6000000,
+ "triggerScaleOutFractionThreshold": 0.3,
+ "scaleInThreshold": 1000000,
+ "triggerScaleInFractionThreshold": 0.9,
+ "scaleActionStartDelayMillis": 300000,
+ "scaleActionPeriodMillis": 60000,
+ "scaleInStep": 1,
+ "scaleOutStep": 2
+ },
+ "taskCount":1,
+ "replicas":1,
+ "taskDuration":"PT1H"
+ },
+ "tuningConfig":{
+ "type":"kafka",
+ "maxRowsPerSegment":5000000
+ }
+}
+```
#### More on consumerProperties
diff --git a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
index 41647d5..f456cb6 100644
--- a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
+++ b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
@@ -37,6 +37,7 @@ import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.supervisor.Supervisor;
import org.apache.druid.indexing.overlord.supervisor.SupervisorReport;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.JodaUtils;
@@ -282,6 +283,18 @@ public class MaterializedViewSupervisor implements Supervisor
// do nothing
}
+ @Override
+ public LagStats computeLagStats()
+ {
+ throw new UnsupportedOperationException("Compute Lag Stats not supported in MaterializedViewSupervisor");
+ }
+
+ @Override
+ public int getActiveTaskGroupsCount()
+ {
+ throw new UnsupportedOperationException("Get Active Task Groups Count is not supported in MaterializedViewSupervisor");
+ }
+
/**
* Find intervals in which derived dataSource should rebuild the segments.
* Choose the latest intervals to create new HadoopIndexTask and submit it.
diff --git a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java
index c82b8b8..01ebdd0 100644
--- a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java
+++ b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java
@@ -29,7 +29,9 @@ import org.apache.druid.indexer.HadoopTuningConfig;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskStorage;
+import org.apache.druid.indexing.overlord.supervisor.Supervisor;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.metadata.MetadataSupervisorManager;
import org.apache.druid.metadata.SqlSegmentsMetadataManager;
@@ -50,6 +52,7 @@ import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.IOException;
+import java.util.concurrent.Callable;
public class MaterializedViewSupervisorSpecTest
{
@@ -156,6 +159,85 @@ public class MaterializedViewSupervisorSpecTest
}
@Test
+ public void testMaterializedViewSupervisorSpecCreated()
+ {
+ Exception ex = null;
+
+ try {
+ MaterializedViewSupervisorSpec spec = new MaterializedViewSupervisorSpec(
+ "wikiticker",
+ new DimensionsSpec(
+ Lists.newArrayList(
+ new StringDimensionSchema("isUnpatrolled"),
+ new StringDimensionSchema("metroCode"),
+ new StringDimensionSchema("namespace"),
+ new StringDimensionSchema("page"),
+ new StringDimensionSchema("regionIsoCode"),
+ new StringDimensionSchema("regionName"),
+ new StringDimensionSchema("user")
+ ),
+ null,
+ null
+ ),
+ new AggregatorFactory[]{
+ new CountAggregatorFactory("count"),
+ new LongSumAggregatorFactory("added", "added")
+ },
+ HadoopTuningConfig.makeDefaultTuningConfig(),
+ null,
+ null,
+ null,
+ null,
+ null,
+ false,
+ objectMapper,
+ null,
+ null,
+ null,
+ null,
+ null,
+ new MaterializedViewTaskConfig(),
+ EasyMock.createMock(AuthorizerMapper.class),
+ new NoopChatHandlerProvider(),
+ new SupervisorStateManagerConfig()
+ );
+ Supervisor supervisor = spec.createSupervisor();
+ Assert.assertTrue(supervisor instanceof MaterializedViewSupervisor);
+
+ SupervisorTaskAutoScaler autoscaler = spec.createAutoscaler(supervisor);
+ Assert.assertNull(autoscaler);
+
+ try {
+ supervisor.computeLagStats();
+ }
+ catch (Exception e) {
+ Assert.assertTrue(e instanceof UnsupportedOperationException);
+ }
+
+ try {
+ int count = supervisor.getActiveTaskGroupsCount();
+ }
+ catch (Exception e) {
+ Assert.assertTrue(e instanceof UnsupportedOperationException);
+ }
+
+ Callable<Integer> noop = new Callable<Integer>() {
+ @Override
+ public Integer call()
+ {
+ return -1;
+ }
+ };
+
+ }
+ catch (Exception e) {
+ ex = e;
+ }
+
+ Assert.assertNull(ex);
+ }
+
+ @Test
public void testSuspendResuume() throws IOException
{
String supervisorStr = "{\n" +
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index d592f78..a02568a 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -38,6 +38,7 @@ import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskStorage;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig;
@@ -58,6 +59,7 @@ import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
+
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -331,6 +333,17 @@ public class KafkaSupervisor extends SeekableStreamSupervisor<Integer, Long, Kaf
}
@Override
+ public LagStats computeLagStats()
+ {
+ Map<Integer, Long> partitionRecordLag = getPartitionRecordLag();
+ if (partitionRecordLag == null) {
+ return new LagStats(0, 0, 0);
+ }
+
+ return computeLags(partitionRecordLag);
+ }
+
+ @Override
protected void updatePartitionLagFromStream()
{
getRecordSupplierLock().lock();
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java
index 62c1e79..87b689e 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java
@@ -24,10 +24,12 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
+import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
import org.apache.druid.java.util.common.StringUtils;
import org.joda.time.DateTime;
import org.joda.time.Period;
+import javax.annotation.Nullable;
import java.util.Map;
public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
@@ -51,6 +53,7 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
@JsonProperty("taskCount") Integer taskCount,
@JsonProperty("taskDuration") Period taskDuration,
@JsonProperty("consumerProperties") Map<String, Object> consumerProperties,
+ @Nullable @JsonProperty("autoScalerConfig") AutoScalerConfig autoScalerConfig,
@JsonProperty("pollTimeout") Long pollTimeout,
@JsonProperty("startDelay") Period startDelay,
@JsonProperty("period") Period period,
@@ -73,6 +76,7 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
completionTimeout,
lateMessageRejectionPeriod,
earlyMessageRejectionPeriod,
+ autoScalerConfig,
lateMessageRejectionStartDateTime
);
@@ -117,6 +121,7 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
", taskCount=" + getTaskCount() +
", taskDuration=" + getTaskDuration() +
", consumerProperties=" + consumerProperties +
+ ", autoScalerConfig=" + getAutoscalerConfig() +
", pollTimeout=" + pollTimeout +
", startDelay=" + getStartDelay() +
", period=" + getPeriod() +
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
index c54a31f..dd712d9 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
@@ -134,6 +134,7 @@ public class KafkaSamplerSpecTest extends InitializedNullHandlingTest
null,
null,
null,
+ null,
true,
null,
null,
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index 1dbabe8..ac6b6e5 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -61,13 +61,16 @@ import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.supervisor.SupervisorReport;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.Status;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
+import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManager;
import org.apache.druid.indexing.seekablestream.supervisor.TaskReportData;
+import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.LagBasedAutoScalerConfig;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
@@ -158,6 +161,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
private RowIngestionMetersFactory rowIngestionMetersFactory;
private ExceptionCapturingServiceEmitter serviceEmitter;
private SupervisorStateManagerConfig supervisorConfig;
+ private KafkaSupervisorIngestionSpec ingestionSchema;
private static String getTopic()
{
@@ -214,6 +218,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
serviceEmitter = new ExceptionCapturingServiceEmitter();
EmittingLogger.registerEmitter(serviceEmitter);
supervisorConfig = new SupervisorStateManagerConfig();
+ ingestionSchema = EasyMock.createMock(KafkaSupervisorIngestionSpec.class);
}
@After
@@ -233,6 +238,197 @@ public class KafkaSupervisorTest extends EasyMockSupport
}
@Test
+ public void testNoInitialStateWithAutoscaler() throws Exception
+ {
+ KafkaIndexTaskClientFactory taskClientFactory = new KafkaIndexTaskClientFactory(
+ null,
+ null
+ )
+ {
+ @Override
+ public KafkaIndexTaskClient build(
+ TaskInfoProvider taskInfoProvider,
+ String dataSource,
+ int numThreads,
+ Duration httpTimeout,
+ long numRetries
+ )
+ {
+ Assert.assertEquals(TEST_CHAT_THREADS, numThreads);
+ Assert.assertEquals(TEST_HTTP_TIMEOUT.toStandardDuration(), httpTimeout);
+ Assert.assertEquals(TEST_CHAT_RETRIES, numRetries);
+ return taskClient;
+ }
+ };
+
+ HashMap<String, Object> autoScalerConfig = new HashMap<>();
+ autoScalerConfig.put("enableTaskAutoScaler", true);
+ autoScalerConfig.put("lagCollectionIntervalMillis", 500);
+ autoScalerConfig.put("lagCollectionRangeMillis", 500);
+ autoScalerConfig.put("scaleOutThreshold", 0);
+ autoScalerConfig.put("triggerScaleOutFractionThreshold", 0.0);
+ autoScalerConfig.put("scaleInThreshold", 1000000);
+ autoScalerConfig.put("triggerScaleInFractionThreshold", 0.8);
+ autoScalerConfig.put("scaleActionStartDelayMillis", 0);
+ autoScalerConfig.put("scaleActionPeriodMillis", 100);
+ autoScalerConfig.put("taskCountMax", 2);
+ autoScalerConfig.put("taskCountMin", 1);
+ autoScalerConfig.put("scaleInStep", 1);
+ autoScalerConfig.put("scaleOutStep", 2);
+ autoScalerConfig.put("minTriggerScaleActionFrequencyMillis", 1200000);
+
+ final Map<String, Object> consumerProperties = KafkaConsumerConfigs.getConsumerProperties();
+ consumerProperties.put("myCustomKey", "myCustomValue");
+ consumerProperties.put("bootstrap.servers", kafkaHost);
+
+ KafkaSupervisorIOConfig kafkaSupervisorIOConfig = new KafkaSupervisorIOConfig(
+ topic,
+ INPUT_FORMAT,
+ 1,
+ 1,
+ new Period("PT1H"),
+ consumerProperties,
+ OBJECT_MAPPER.convertValue(autoScalerConfig, LagBasedAutoScalerConfig.class),
+ KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
+ new Period("P1D"),
+ new Period("PT30S"),
+ true,
+ new Period("PT30M"),
+ null,
+ null,
+ null
+ );
+
+ final KafkaSupervisorTuningConfig tuningConfigOri = new KafkaSupervisorTuningConfig(
+ null,
+ 1000,
+ null,
+ null,
+ 50000,
+ null,
+ new Period("P1Y"),
+ new File("/test"),
+ null,
+ null,
+ null,
+ true,
+ false,
+ null,
+ false,
+ null,
+ numThreads,
+ TEST_CHAT_THREADS,
+ TEST_CHAT_RETRIES,
+ TEST_HTTP_TIMEOUT,
+ TEST_SHUTDOWN_TIMEOUT,
+ null,
+ null,
+ null,
+ null,
+ null
+ );
+
+ EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(kafkaSupervisorIOConfig).anyTimes();
+ EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
+ EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(tuningConfigOri).anyTimes();
+ EasyMock.replay(ingestionSchema);
+
+ SeekableStreamSupervisorSpec testableSupervisorSpec = new KafkaSupervisorSpec(
+ ingestionSchema,
+ dataSchema,
+ tuningConfigOri,
+ kafkaSupervisorIOConfig,
+ null,
+ false,
+ taskStorage,
+ taskMaster,
+ indexerMetadataStorageCoordinator,
+ taskClientFactory,
+ OBJECT_MAPPER,
+ new NoopServiceEmitter(),
+ new DruidMonitorSchedulerConfig(),
+ rowIngestionMetersFactory,
+ new SupervisorStateManagerConfig()
+ );
+
+ supervisor = new TestableKafkaSupervisor(
+ taskStorage,
+ taskMaster,
+ indexerMetadataStorageCoordinator,
+ taskClientFactory,
+ OBJECT_MAPPER,
+ (KafkaSupervisorSpec) testableSupervisorSpec,
+ rowIngestionMetersFactory
+ );
+
+ SupervisorTaskAutoScaler autoscaler = testableSupervisorSpec.createAutoscaler(supervisor);
+
+
+ final KafkaSupervisorTuningConfig tuningConfig = supervisor.getTuningConfig();
+ addSomeEvents(1);
+
+ Capture<KafkaIndexTask> captured = Capture.newInstance();
+ EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
+ EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
+ EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()).anyTimes();
+ EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
+ EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
+ new KafkaDataSourceMetadata(
+ null
+ )
+ ).anyTimes();
+ EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true);
+ taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class));
+ replayAll();
+
+ supervisor.start();
+ int taskCountBeforeScale = supervisor.getIoConfig().getTaskCount();
+ Assert.assertEquals(1, taskCountBeforeScale);
+ autoscaler.start();
+ supervisor.runInternal();
+ Thread.sleep(1 * 1000);
+ verifyAll();
+
+ int taskCountAfterScale = supervisor.getIoConfig().getTaskCount();
+ Assert.assertEquals(2, taskCountAfterScale);
+
+
+ KafkaIndexTask task = captured.getValue();
+ Assert.assertEquals(KafkaSupervisorTest.dataSchema, task.getDataSchema());
+ Assert.assertEquals(tuningConfig.convertToTaskTuningConfig(), task.getTuningConfig());
+
+ KafkaIndexTaskIOConfig taskConfig = task.getIOConfig();
+ Assert.assertEquals(kafkaHost, taskConfig.getConsumerProperties().get("bootstrap.servers"));
+ Assert.assertEquals("myCustomValue", taskConfig.getConsumerProperties().get("myCustomKey"));
+ Assert.assertEquals("sequenceName-0", taskConfig.getBaseSequenceName());
+ Assert.assertTrue("isUseTransaction", taskConfig.isUseTransaction());
+ Assert.assertFalse("minimumMessageTime", taskConfig.getMinimumMessageTime().isPresent());
+ Assert.assertFalse("maximumMessageTime", taskConfig.getMaximumMessageTime().isPresent());
+
+ Assert.assertEquals(topic, taskConfig.getStartSequenceNumbers().getStream());
+ Assert.assertEquals(0L, (long) taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(0));
+ Assert.assertEquals(0L, (long) taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(1));
+ Assert.assertEquals(0L, (long) taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(2));
+
+ Assert.assertEquals(topic, taskConfig.getEndSequenceNumbers().getStream());
+ Assert.assertEquals(
+ Long.MAX_VALUE,
+ (long) taskConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap().get(0)
+ );
+ Assert.assertEquals(
+ Long.MAX_VALUE,
+ (long) taskConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap().get(1)
+ );
+ Assert.assertEquals(
+ Long.MAX_VALUE,
+ (long) taskConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap().get(2)
+ );
+
+ autoscaler.reset();
+ autoscaler.stop();
+ }
+
+ @Test
public void testCreateBaseTaskContexts() throws JsonProcessingException
{
supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);
@@ -3379,6 +3575,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
taskCount,
new Period(duration),
consumerProperties,
+ null,
KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
new Period("P1D"),
new Period("PT30S"),
@@ -3491,6 +3688,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
taskCount,
new Period(duration),
consumerProperties,
+ null,
KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
new Period("P1D"),
new Period("PT30S"),
@@ -3607,6 +3805,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
taskCount,
new Period(duration),
consumerProperties,
+ null,
KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
new Period("P1D"),
new Period("PT30S"),
diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
index a7bc599..92defd2 100644
--- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
+++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
@@ -39,6 +39,7 @@ import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskStorage;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
@@ -378,6 +379,13 @@ public class KinesisSupervisor extends SeekableStreamSupervisor<String, String,
return true;
}
+ // not yet supported, will be implemented in the future maybe? need to find a proper way to measure kinesis lag.
+ @Override
+ public LagStats computeLagStats()
+ {
+ throw new UnsupportedOperationException("Compute Lag Stats is not supported in KinesisSupervisor yet.");
+ }
+
@Override
protected Map<String, OrderedSequenceNumber<String>> filterExpiredPartitionsFromStartingOffsets(
Map<String, OrderedSequenceNumber<String>> startingOffsets
diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java
index f68e0b7..b43cece 100644
--- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java
+++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java
@@ -26,9 +26,12 @@ import org.apache.druid.data.input.InputFormat;
import org.apache.druid.indexing.kinesis.KinesisIndexTaskIOConfig;
import org.apache.druid.indexing.kinesis.KinesisRegion;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
+import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
import org.joda.time.DateTime;
import org.joda.time.Period;
+import javax.annotation.Nullable;
+
public class KinesisSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
{
private final String endpoint;
@@ -70,6 +73,7 @@ public class KinesisSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
@JsonProperty("fetchDelayMillis") Integer fetchDelayMillis,
@JsonProperty("awsAssumedRoleArn") String awsAssumedRoleArn,
@JsonProperty("awsExternalId") String awsExternalId,
+ @Nullable @JsonProperty("autoScalerConfig") AutoScalerConfig autoScalerConfig,
@JsonProperty("deaggregate") boolean deaggregate
)
{
@@ -85,8 +89,16 @@ public class KinesisSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
completionTimeout,
lateMessageRejectionPeriod,
earlyMessageRejectionPeriod,
+ autoScalerConfig,
lateMessageRejectionStartDateTime
);
+
+ // for now dynamic Allocation Tasks is not supported here
+ // throw UnsupportedOperationException in case someone sets this on a kinesis supervisor spec.
+ if (autoScalerConfig != null) {
+ throw new UnsupportedOperationException("Tasks auto scaler for kinesis is not supported yet. Please remove autoScalerConfig or set it to null!");
+ }
+
this.endpoint = endpoint != null
? endpoint
: (region != null ? region.getEndpoint() : KinesisRegion.US_EAST_1.getEndpoint());
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java
index 0b9ebdb..e17ce1e 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java
@@ -153,6 +153,7 @@ public class KinesisSamplerSpecTest extends EasyMockSupport
null,
null,
null,
+ null,
false
),
null,
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
index 3f35204..5d1d0f6 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
@@ -66,6 +66,7 @@ import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManager;
import org.apache.druid.indexing.seekablestream.supervisor.TaskReportData;
+import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
@@ -102,6 +103,7 @@ import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -304,6 +306,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
1000,
null,
null,
+ null,
false
);
KinesisIndexTaskClientFactory clientFactory = new KinesisIndexTaskClientFactory(null, OBJECT_MAPPER);
@@ -345,6 +348,72 @@ public class KinesisSupervisorTest extends EasyMockSupport
}
@Test
+ public void testKinesisIOConfig()
+ {
+ Exception e = null;
+ try {
+ KinesisSupervisorIOConfig kinesisSupervisorIOConfig = new KinesisSupervisorIOConfig(
+ STREAM,
+ INPUT_FORMAT,
+ "awsEndpoint",
+ null,
+ 1,
+ 1,
+ new Period("PT30M"),
+ new Period("P1D"),
+ new Period("PT30S"),
+ false,
+ new Period("PT30M"),
+ null,
+ null,
+ null,
+ 100,
+ 1000,
+ null,
+ null,
+ null,
+ false
+ );
+ AutoScalerConfig autoScalerConfig = kinesisSupervisorIOConfig.getAutoscalerConfig();
+ Assert.assertNull(autoScalerConfig);
+ }
+ catch (Exception ex) {
+ e = ex;
+ }
+ Assert.assertNull(e);
+
+ try {
+ KinesisSupervisorIOConfig kinesisSupervisorIOConfig = new KinesisSupervisorIOConfig(
+ STREAM,
+ INPUT_FORMAT,
+ "awsEndpoint",
+ null,
+ 1,
+ 1,
+ new Period("PT30M"),
+ new Period("P1D"),
+ new Period("PT30S"),
+ false,
+ new Period("PT30M"),
+ null,
+ null,
+ null,
+ 100,
+ 1000,
+ null,
+ null,
+ OBJECT_MAPPER.convertValue(new HashMap<>(), AutoScalerConfig.class),
+ false
+ );
+ }
+ catch (Exception ex) {
+ e = ex;
+ }
+ Assert.assertNotNull(e);
+ Assert.assertTrue(e instanceof UnsupportedOperationException);
+ }
+
+ @Test
public void testMultiTask() throws Exception
{
supervisor = getTestableSupervisor(1, 2, true, "PT1H", null, null);
@@ -4721,6 +4790,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
null,
null,
null,
+ null,
false
);
@@ -4863,6 +4933,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
fetchDelayMillis,
null,
null,
+ null,
false
);
@@ -4950,6 +5021,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
fetchDelayMillis,
null,
null,
+ null,
false
);
@@ -5039,6 +5111,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
fetchDelayMillis,
null,
null,
+ null,
false
);
diff --git a/indexing-service/pom.xml b/indexing-service/pom.xml
index 5564122..be8360b 100644
--- a/indexing-service/pom.xml
+++ b/indexing-service/pom.xml
@@ -62,7 +62,11 @@
<artifactId>druid-hll</artifactId>
<version>${project.parent.version}</version>
</dependency>
-
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-collections4</artifactId>
+ <version>4.2</version>
+ </dependency>
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
index 48153b0..b638fcf 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
@@ -23,6 +23,7 @@ import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
@@ -30,6 +31,7 @@ import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.metadata.MetadataSupervisorManager;
import javax.annotation.Nullable;
+
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -44,6 +46,8 @@ public class SupervisorManager
private final MetadataSupervisorManager metadataSupervisorManager;
private final ConcurrentHashMap<String, Pair<Supervisor, SupervisorSpec>> supervisors = new ConcurrentHashMap<>();
+ // SupervisorTaskAutoScaler could be null
+ private final ConcurrentHashMap<String, SupervisorTaskAutoScaler> autoscalers = new ConcurrentHashMap<>();
private final Object lock = new Object();
private volatile boolean started = false;
@@ -54,6 +58,11 @@ public class SupervisorManager
this.metadataSupervisorManager = metadataSupervisorManager;
}
+ public MetadataSupervisorManager getMetadataSupervisorManager()
+ {
+ return metadataSupervisorManager;
+ }
+
public Set<String> getSupervisorIds()
{
return supervisors.keySet();
@@ -140,12 +149,17 @@ public class SupervisorManager
for (String id : supervisors.keySet()) {
try {
supervisors.get(id).lhs.stop(false);
+ SupervisorTaskAutoScaler autoscaler = autoscalers.get(id);
+ if (autoscaler != null) {
+ autoscaler.stop();
+ }
}
catch (Exception e) {
log.warn(e, "Caught exception while stopping supervisor [%s]", id);
}
}
supervisors.clear();
+ autoscalers.clear();
started = false;
}
@@ -187,6 +201,10 @@ public class SupervisorManager
}
supervisor.lhs.reset(dataSourceMetadata);
+ SupervisorTaskAutoScaler autoscaler = autoscalers.get(id);
+ if (autoscaler != null) {
+ autoscaler.reset();
+ }
return true;
}
@@ -238,6 +256,12 @@ public class SupervisorManager
pair.lhs.stop(true);
supervisors.remove(id);
+ SupervisorTaskAutoScaler autoscler = autoscalers.get(id);
+ if (autoscler != null) {
+ autoscler.stop();
+ autoscalers.remove(id);
+ }
+
return true;
}
@@ -282,9 +306,16 @@ public class SupervisorManager
}
Supervisor supervisor;
+ SupervisorTaskAutoScaler autoscaler;
try {
supervisor = spec.createSupervisor();
+ autoscaler = spec.createAutoscaler(supervisor);
+
supervisor.start();
+ if (autoscaler != null) {
+ autoscaler.start();
+ autoscalers.put(id, autoscaler);
+ }
}
catch (Exception e) {
// Supervisor creation or start failed write tombstone only when trying to start a new supervisor
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index e63f538..11339d4 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -56,8 +56,10 @@ import org.apache.druid.indexing.overlord.TaskRunnerListener;
import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.supervisor.Supervisor;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
import org.apache.druid.indexing.overlord.supervisor.SupervisorReport;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClient;
@@ -71,6 +73,7 @@ import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.common.StreamException;
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
+import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
@@ -82,6 +85,7 @@ import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.metadata.EntryExistsException;
+import org.apache.druid.metadata.MetadataSupervisorManager;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.joda.time.DateTime;
@@ -102,6 +106,7 @@ import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
@@ -318,6 +323,127 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
}
+ // change taskCount without resubmitting.
+ private class DynamicAllocationTasksNotice implements Notice
+ {
+ Callable<Integer> scaleAction;
+
+ DynamicAllocationTasksNotice(Callable<Integer> scaleAction)
+ {
+ this.scaleAction = scaleAction;
+ }
+
+ /**
+ * This method will do lag points collection and check dynamic scale action is necessary or not.
+ */
+ @Override
+ public void handle()
+ {
+ if (autoScalerConfig == null) {
+ log.warn("autoScalerConfig is null but dynamic allocation notice is submitted, how can it be ?");
+ } else {
+ try {
+ long nowTime = System.currentTimeMillis();
+ if (spec.isSuspended()) {
+ log.info("Skipping DynamicAllocationTasksNotice execution because [%s] supervisor is suspended",
+ dataSource
+ );
+ return;
+ }
+ log.debug("PendingCompletionTaskGroups is [%s] for dataSource [%s]", pendingCompletionTaskGroups,
+ dataSource
+ );
+ for (CopyOnWriteArrayList<TaskGroup> list : pendingCompletionTaskGroups.values()) {
+ if (!list.isEmpty()) {
+ log.info(
+ "Skipping DynamicAllocationTasksNotice execution for datasource [%s] because following tasks are pending [%s]",
+ dataSource, pendingCompletionTaskGroups
+ );
+ return;
+ }
+ }
+ if (nowTime - dynamicTriggerLastRunTime < autoScalerConfig.getMinTriggerScaleActionFrequencyMillis()) {
+ log.info(
+ "DynamicAllocationTasksNotice submitted again in [%d] millis, minTriggerDynamicFrequency is [%s] for dataSource [%s], skipping it!",
+ nowTime - dynamicTriggerLastRunTime, autoScalerConfig.getMinTriggerScaleActionFrequencyMillis(), dataSource
+ );
+ return;
+ }
+ final Integer desriedTaskCount = scaleAction.call();
+ boolean allocationSuccess = changeTaskCount(desriedTaskCount);
+ if (allocationSuccess) {
+ dynamicTriggerLastRunTime = nowTime;
+ }
+ }
+ catch (Exception ex) {
+ log.warn(ex, "Error parsing DynamicAllocationTasksNotice");
+ }
+ }
+ }
+ }
+
+ /**
+ * This method determines how to do scale actions based on collected lag points.
+ * If scale action is triggered :
+ * First of all, call gracefulShutdownInternal() which will change the state of current datasource ingest tasks from reading to publishing.
+ * Secondly, clear all the stateful data structures: activelyReadingTaskGroups, partitionGroups, partitionOffsets, pendingCompletionTaskGroups, partitionIds. These structures will be rebuiled in the next 'RunNotice'.
+ * Finally, change the taskCount in SeekableStreamSupervisorIOConfig and sync it to MetadataStorage.
+ * After the taskCount is changed in SeekableStreamSupervisorIOConfig, next RunNotice will create scaled number of ingest tasks without resubmitting the supervisor.
+ * @param desiredActiveTaskCount desired taskCount computed from AutoScaler
+ * @return Boolean flag indicating if scale action was executed or not. If true, it will wait at least 'minTriggerScaleActionFrequencyMillis' before next 'changeTaskCount'.
+ * If false, it will do 'changeTaskCount' again after 'scaleActionPeriodMillis' millis.
+ * @throws InterruptedException
+ * @throws ExecutionException
+ * @throws TimeoutException
+ */
+ private boolean changeTaskCount(int desiredActiveTaskCount) throws InterruptedException, ExecutionException, TimeoutException
+ {
+ int currentActiveTaskCount;
+ Collection<TaskGroup> activeTaskGroups = activelyReadingTaskGroups.values();
+ currentActiveTaskCount = activeTaskGroups.size();
+
+ if (desiredActiveTaskCount < 0 || desiredActiveTaskCount == currentActiveTaskCount) {
+ return false;
+ } else {
+ log.info(
+ "Starting scale action, current active task count is [%d] and desired task count is [%d] for dataSource [%s].",
+ currentActiveTaskCount, desiredActiveTaskCount, dataSource
+ );
+ gracefulShutdownInternal();
+ changeTaskCountInIOConfig(desiredActiveTaskCount);
+ clearAllocationInfo();
+ log.info("Changed taskCount to [%s] for dataSource [%s].", desiredActiveTaskCount, dataSource);
+ return true;
+ }
+ }
+
+ private void changeTaskCountInIOConfig(int desiredActiveTaskCount)
+ {
+ ioConfig.setTaskCount(desiredActiveTaskCount);
+ try {
+ Optional<SupervisorManager> supervisorManager = taskMaster.getSupervisorManager();
+ if (supervisorManager.isPresent()) {
+ MetadataSupervisorManager metadataSupervisorManager = supervisorManager.get().getMetadataSupervisorManager();
+ metadataSupervisorManager.insert(dataSource, spec);
+ } else {
+ log.error("supervisorManager is null in taskMaster, skipping scale action for dataSource [%s].", dataSource);
+ }
+ }
+ catch (Exception e) {
+ log.error(e, "Failed to sync taskCount to MetaStorage for dataSource [%s].", dataSource);
+ }
+ }
+
+ private void clearAllocationInfo()
+ {
+ activelyReadingTaskGroups.clear();
+ partitionGroups.clear();
+ partitionOffsets.clear();
+
+ pendingCompletionTaskGroups.clear();
+ partitionIds.clear();
+ }
+
private class GracefulShutdownNotice extends ShutdownNotice
{
@Override
@@ -470,6 +596,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
private final SeekableStreamIndexTaskClient<PartitionIdType, SequenceOffsetType> taskClient;
private final SeekableStreamSupervisorSpec spec;
private final SeekableStreamSupervisorIOConfig ioConfig;
+ private final AutoScalerConfig autoScalerConfig;
private final SeekableStreamSupervisorTuningConfig tuningConfig;
private final SeekableStreamIndexTaskTuningConfig taskTuningConfig;
private final String supervisorId;
@@ -488,6 +615,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
private final boolean useExclusiveStartingSequence;
private boolean listenerRegistered = false;
private long lastRunTime;
+ private long dynamicTriggerLastRunTime;
private int initRetryCounter = 0;
private volatile DateTime firstRunTime;
private volatile DateTime earlyStopTime = null;
@@ -519,20 +647,40 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
this.useExclusiveStartingSequence = useExclusiveStartingSequence;
this.dataSource = spec.getDataSchema().getDataSource();
this.ioConfig = spec.getIoConfig();
+ this.autoScalerConfig = ioConfig.getAutoscalerConfig();
this.tuningConfig = spec.getTuningConfig();
this.taskTuningConfig = this.tuningConfig.convertToTaskTuningConfig();
this.supervisorId = supervisorId;
this.exec = Execs.singleThreaded(StringUtils.encodeForFormat(supervisorId));
this.scheduledExec = Execs.scheduledSingleThreaded(StringUtils.encodeForFormat(supervisorId) + "-Scheduler-%d");
this.reportingExec = Execs.scheduledSingleThreaded(StringUtils.encodeForFormat(supervisorId) + "-Reporting-%d");
+
this.stateManager = new SeekableStreamSupervisorStateManager(
spec.getSupervisorStateManagerConfig(),
spec.isSuspended()
);
- int workerThreads = (this.tuningConfig.getWorkerThreads() != null
- ? this.tuningConfig.getWorkerThreads()
- : Math.min(10, this.ioConfig.getTaskCount()));
+ int workerThreads;
+ int chatThreads;
+ if (autoScalerConfig != null && autoScalerConfig.getEnableTaskAutoScaler()) {
+ log.info("Running Task autoscaler for datasource [%s]", dataSource);
+
+ workerThreads = (this.tuningConfig.getWorkerThreads() != null
+ ? this.tuningConfig.getWorkerThreads()
+ : Math.min(10, autoScalerConfig.getTaskCountMax()));
+
+ chatThreads = (this.tuningConfig.getChatThreads() != null
+ ? this.tuningConfig.getChatThreads()
+ : Math.min(10, autoScalerConfig.getTaskCountMax() * this.ioConfig.getReplicas()));
+ } else {
+ workerThreads = (this.tuningConfig.getWorkerThreads() != null
+ ? this.tuningConfig.getWorkerThreads()
+ : Math.min(10, this.ioConfig.getTaskCount()));
+
+ chatThreads = (this.tuningConfig.getChatThreads() != null
+ ? this.tuningConfig.getChatThreads()
+ : Math.min(10, this.ioConfig.getTaskCount() * this.ioConfig.getReplicas()));
+ }
this.workerExec = MoreExecutors.listeningDecorator(
Execs.multiThreaded(
@@ -578,9 +726,6 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
+ IndexTaskClient.MAX_RETRY_WAIT_SECONDS)
);
- int chatThreads = (this.tuningConfig.getChatThreads() != null
- ? this.tuningConfig.getChatThreads()
- : Math.min(10, this.ioConfig.getTaskCount() * this.ioConfig.getReplicas()));
this.taskClient = taskClientFactory.build(
taskInfoProvider,
dataSource,
@@ -597,6 +742,13 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
);
}
+
+ @Override
+ public int getActiveTaskGroupsCount()
+ {
+ return activelyReadingTaskGroups.values().size();
+ }
+
@Override
public void start()
{
@@ -659,6 +811,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
scheduledExec.shutdownNow(); // stop recurring executions
reportingExec.shutdownNow();
+
if (started) {
Optional<TaskRunner> taskRunner = taskMaster.getTaskRunner();
if (taskRunner.isPresent()) {
@@ -774,7 +927,6 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
);
scheduleReporting(reportingExec);
-
started = true;
log.info(
"Started SeekableStreamSupervisor[%s], first run in [%s], with spec: [%s]",
@@ -797,6 +949,11 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
}
+ public Runnable buildDynamicAllocationTask(Callable<Integer> scaleAction)
+ {
+ return () -> notices.add(new DynamicAllocationTasksNotice(scaleAction));
+ }
+
private Runnable buildRunTask()
{
return () -> notices.add(new RunNotice());
@@ -1901,6 +2058,11 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
return false;
}
+ public int getPartitionCount()
+ {
+ return recordSupplier.getPartitionIds(ioConfig.getStream()).size();
+ }
+
private boolean updatePartitionDataFromStream()
{
List<PartitionIdType> previousPartitionIds = new ArrayList<>(partitionIds);
@@ -3510,29 +3672,21 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
return;
}
- long maxLag = 0, totalLag = 0, avgLag;
- for (long lag : partitionLags.values()) {
- if (lag > maxLag) {
- maxLag = lag;
- }
- totalLag += lag;
- }
- avgLag = partitionLags.size() == 0 ? 0 : totalLag / partitionLags.size();
-
+ LagStats lagStats = computeLags(partitionLags);
emitter.emit(
ServiceMetricEvent.builder()
.setDimension("dataSource", dataSource)
- .build(StringUtils.format("ingest/%s/lag%s", type, suffix), totalLag)
+ .build(StringUtils.format("ingest/%s/lag%s", type, suffix), lagStats.getTotalLag())
);
emitter.emit(
ServiceMetricEvent.builder()
.setDimension("dataSource", dataSource)
- .build(StringUtils.format("ingest/%s/maxLag%s", type, suffix), maxLag)
+ .build(StringUtils.format("ingest/%s/maxLag%s", type, suffix), lagStats.getMaxLag())
);
emitter.emit(
ServiceMetricEvent.builder()
.setDimension("dataSource", dataSource)
- .build(StringUtils.format("ingest/%s/avgLag%s", type, suffix), avgLag)
+ .build(StringUtils.format("ingest/%s/avgLag%s", type, suffix), lagStats.getAvgLag())
);
};
@@ -3545,6 +3699,24 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
}
+
+ /**
+ * This method computes maxLag, totalLag and avgLag
+ * @param partitionLags lags per partition
+ */
+ protected LagStats computeLags(Map<PartitionIdType, Long> partitionLags)
+ {
+ long maxLag = 0, totalLag = 0, avgLag;
+ for (long lag : partitionLags.values()) {
+ if (lag > maxLag) {
+ maxLag = lag;
+ }
+ totalLag += lag;
+ }
+ avgLag = partitionLags.size() == 0 ? 0 : totalLag / partitionLags.size();
+ return new LagStats(maxLag, totalLag, avgLag);
+ }
+
/**
* a special sequence number that is used to indicate that the sequence offset
* for a particular partition has not yet been calculated by the supervisor. When
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
index 723e22e..3ed55ec 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
import org.apache.druid.java.util.common.IAE;
import org.joda.time.DateTime;
import org.joda.time.Duration;
@@ -37,7 +38,7 @@ public abstract class SeekableStreamSupervisorIOConfig
@Nullable
private final InputFormat inputFormat; // nullable for backward compatibility
private final Integer replicas;
- private final Integer taskCount;
+ private Integer taskCount;
private final Duration taskDuration;
private final Duration startDelay;
private final Duration period;
@@ -46,6 +47,7 @@ public abstract class SeekableStreamSupervisorIOConfig
private final Optional<Duration> lateMessageRejectionPeriod;
private final Optional<Duration> earlyMessageRejectionPeriod;
private final Optional<DateTime> lateMessageRejectionStartDateTime;
+ @Nullable private final AutoScalerConfig autoScalerConfig;
public SeekableStreamSupervisorIOConfig(
String stream,
@@ -59,13 +61,21 @@ public abstract class SeekableStreamSupervisorIOConfig
Period completionTimeout,
Period lateMessageRejectionPeriod,
Period earlyMessageRejectionPeriod,
+ @Nullable AutoScalerConfig autoScalerConfig,
DateTime lateMessageRejectionStartDateTime
)
{
this.stream = Preconditions.checkNotNull(stream, "stream cannot be null");
this.inputFormat = inputFormat;
this.replicas = replicas != null ? replicas : 1;
- this.taskCount = taskCount != null ? taskCount : 1;
+ // Could be null
+ this.autoScalerConfig = autoScalerConfig;
+ // if autoscaler is enable then taskcount will be ignored here. and init taskcount will be equal to taskCountMin
+ if (autoScalerConfig != null && autoScalerConfig.getEnableTaskAutoScaler()) {
+ this.taskCount = autoScalerConfig.getTaskCountMin();
+ } else {
+ this.taskCount = taskCount != null ? taskCount : 1;
+ }
this.taskDuration = defaultDuration(taskDuration, "PT1H");
this.startDelay = defaultDuration(startDelay, "PT5S");
this.period = defaultDuration(period, "PT30S");
@@ -113,12 +123,24 @@ public abstract class SeekableStreamSupervisorIOConfig
return replicas;
}
+ @Nullable
+ @JsonProperty
+ public AutoScalerConfig getAutoscalerConfig()
+ {
+ return autoScalerConfig;
+ }
+
@JsonProperty
public Integer getTaskCount()
{
return taskCount;
}
+ public void setTaskCount(final int taskCount)
+ {
+ this.taskCount = taskCount;
+ }
+
@JsonProperty
public Duration getTaskDuration()
{
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
index 50be464..ff1d317 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
@@ -32,7 +32,10 @@ import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.supervisor.Supervisor;
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClientFactory;
+import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
+import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.NoopTaskAutoScaler;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.indexing.DataSchema;
@@ -151,6 +154,21 @@ public abstract class SeekableStreamSupervisorSpec implements SupervisorSpec
@Override
public abstract Supervisor createSupervisor();
+ /**
+ * An autoScaler instance will be returned depending on the autoScalerConfig. In case autoScalerConfig is null or autoScaler is disabled then NoopTaskAutoScaler will be returned.
+ * @param supervisor
+ * @return autoScaler
+ */
+ @Override
+ public SupervisorTaskAutoScaler createAutoscaler(Supervisor supervisor)
+ {
+ AutoScalerConfig autoScalerConfig = ingestionSchema.getIOConfig().getAutoscalerConfig();
+ if (autoScalerConfig != null && autoScalerConfig.getEnableTaskAutoScaler() && supervisor instanceof SeekableStreamSupervisor) {
+ return autoScalerConfig.createAutoScaler(supervisor, this);
+ }
+ return new NoopTaskAutoScaler();
+ }
+
@Override
public List<String> getDataSources()
{
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/AutoScalerConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/AutoScalerConfig.java
new file mode 100644
index 0000000..53174a1
--- /dev/null
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/AutoScalerConfig.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.druid.guice.annotations.UnstableApi;
+import org.apache.druid.indexing.overlord.supervisor.Supervisor;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
+
+@UnstableApi
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "autoScalerStrategy", defaultImpl = LagBasedAutoScalerConfig.class)
+@JsonSubTypes(value = {
+ @Type(name = "lagBased", value = LagBasedAutoScalerConfig.class)
+})
+public interface AutoScalerConfig
+{
+ boolean getEnableTaskAutoScaler();
+ long getMinTriggerScaleActionFrequencyMillis();
+ int getTaskCountMax();
+ int getTaskCountMin();
+ SupervisorTaskAutoScaler createAutoScaler(Supervisor supervisor, SupervisorSpec spec);
+}
+
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
new file mode 100644
index 0000000..8c64527
--- /dev/null
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
+
+import org.apache.commons.collections4.queue.CircularFifoQueue;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
+import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class LagBasedAutoScaler implements SupervisorTaskAutoScaler
+{
+ private static final EmittingLogger log = new EmittingLogger(LagBasedAutoScaler.class);
+ private final String dataSource;
+ private final CircularFifoQueue<Long> lagMetricsQueue;
+ private final ScheduledExecutorService lagComputationExec;
+ private final ScheduledExecutorService allocationExec;
+ private final SupervisorSpec spec;
+ private final SeekableStreamSupervisor supervisor;
+ private final LagBasedAutoScalerConfig lagBasedAutoScalerConfig;
+
+ private static final ReentrantLock LOCK = new ReentrantLock(true);
+
+ public LagBasedAutoScaler(SeekableStreamSupervisor supervisor, String dataSource,
+ LagBasedAutoScalerConfig autoScalerConfig, SupervisorSpec spec
+ )
+ {
+ this.lagBasedAutoScalerConfig = autoScalerConfig;
+ final String supervisorId = StringUtils.format("Supervisor-%s", dataSource);
+ this.dataSource = dataSource;
+ final int slots = (int) (lagBasedAutoScalerConfig.getLagCollectionRangeMillis() / lagBasedAutoScalerConfig
+ .getLagCollectionIntervalMillis()) + 1;
+ this.lagMetricsQueue = new CircularFifoQueue<>(slots);
+ this.allocationExec = Execs.scheduledSingleThreaded(StringUtils.encodeForFormat(supervisorId) + "-Allocation-%d");
+ this.lagComputationExec = Execs.scheduledSingleThreaded(StringUtils.encodeForFormat(supervisorId) + "-Computation-%d");
+ this.spec = spec;
+ this.supervisor = supervisor;
+ }
+
+ @Override
+ public void start()
+ {
+ Callable<Integer> scaleAction = () -> {
+ LOCK.lock();
+ int desiredTaskCount = -1;
+ try {
+ desiredTaskCount = computeDesiredTaskCount(new ArrayList<>(lagMetricsQueue));
+
+ if (desiredTaskCount != -1) {
+ lagMetricsQueue.clear();
+ }
+ }
+ catch (Exception ex) {
+ log.warn(ex, "Exception while computing desired task count for [%s]", dataSource);
+ }
+ finally {
+ LOCK.unlock();
+ }
+ return desiredTaskCount;
+ };
+
+ lagComputationExec.scheduleAtFixedRate(
+ computeAndCollectLag(),
+ lagBasedAutoScalerConfig.getScaleActionStartDelayMillis(), // wait for tasks to start up
+ lagBasedAutoScalerConfig.getLagCollectionIntervalMillis(),
+ TimeUnit.MILLISECONDS
+ );
+ allocationExec.scheduleAtFixedRate(
+ supervisor.buildDynamicAllocationTask(scaleAction),
+ lagBasedAutoScalerConfig.getScaleActionStartDelayMillis() + lagBasedAutoScalerConfig
+ .getLagCollectionRangeMillis(),
+ lagBasedAutoScalerConfig.getScaleActionPeriodMillis(),
+ TimeUnit.MILLISECONDS
+ );
+ log.info(
+ "LagBasedAutoScaler will collect lag every [%d] millis and will keep [%d] data points for the last [%d] millis for dataSource [%s]",
+ lagBasedAutoScalerConfig.getLagCollectionIntervalMillis(), lagMetricsQueue.size(),
+ lagBasedAutoScalerConfig.getLagCollectionRangeMillis(), dataSource
+ );
+ }
+
+ @Override
+ public void stop()
+ {
+ allocationExec.shutdownNow();
+ lagComputationExec.shutdownNow();
+ }
+
+ @Override
+ public void reset()
+ {
+ // clear queue for kafka lags
+ if (lagMetricsQueue != null) {
+ try {
+ LOCK.lock();
+ lagMetricsQueue.clear();
+ }
+ catch (Exception e) {
+ log.warn(e, "Error,when clear queue in rest action");
+ }
+ finally {
+ LOCK.unlock();
+ }
+ }
+ }
+
+ /**
+ * This method computes current consumer lag. Gets the total lag of all partitions and fill in the lagMetricsQueue
+ *
+ * @return a Runnbale object to compute and collect lag.
+ */
+ private Runnable computeAndCollectLag()
+ {
+ return () -> {
+ LOCK.lock();
+ try {
+ if (!spec.isSuspended()) {
+ LagStats lagStats = supervisor.computeLagStats();
+ if (lagStats == null) {
+ lagMetricsQueue.offer(0L);
+ } else {
+ long totalLags = lagStats.getTotalLag();
+ lagMetricsQueue.offer(totalLags > 0 ? totalLags : 0L);
+ }
+ log.debug("Current lags [%s] for dataSource [%s].", new ArrayList<>(lagMetricsQueue), dataSource);
+ } else {
+ log.warn("[%s] supervisor is suspended, skipping lag collection", dataSource);
+ }
+ }
+ catch (Exception e) {
+ log.error(e, "Error while collecting lags");
+ }
+ finally {
+ LOCK.unlock();
+ }
+ };
+ }
+
+ /**
+ * This method determines whether to do scale actions based on collected lag points.
+ * Current algorithm of scale is simple:
+ * First of all, compute the proportion of lag points higher/lower than scaleOutThreshold/scaleInThreshold, getting scaleOutThreshold/scaleInThreshold.
+ * Secondly, compare scaleOutThreshold/scaleInThreshold with triggerScaleOutFractionThreshold/triggerScaleInFractionThreshold. P.S. Scale out action has higher priority than scale in action.
+ * Finaly, if scaleOutThreshold/scaleInThreshold is higher than triggerScaleOutFractionThreshold/triggerScaleInFractionThreshold, scale out/in action would be triggered.
+ *
+ * @param lags the lag metrics of Stream(Kafka/Kinesis)
+ * @return Integer. target number of tasksCount, -1 means skip scale action.
+ */
+ private int computeDesiredTaskCount(List<Long> lags)
+ {
+ // if supervisor is not suspended, ensure required tasks are running
+ // if suspended, ensure tasks have been requested to gracefully stop
+ log.debug("Computing desired task count for [%s], based on following lags : [%s]", dataSource, lags);
+ int beyond = 0;
+ int within = 0;
+ int metricsCount = lags.size();
+ for (Long lag : lags) {
+ if (lag >= lagBasedAutoScalerConfig.getScaleOutThreshold()) {
+ beyond++;
+ }
+ if (lag <= lagBasedAutoScalerConfig.getScaleInThreshold()) {
+ within++;
+ }
+ }
+ double beyondProportion = beyond * 1.0 / metricsCount;
+ double withinProportion = within * 1.0 / metricsCount;
+
+ log.debug("Calculated beyondProportion is [%s] and withinProportion is [%s] for dataSource [%s].", beyondProportion,
+ withinProportion, dataSource
+ );
+
+ int currentActiveTaskCount = supervisor.getActiveTaskGroupsCount();
+ int desiredActiveTaskCount;
+
+ if (beyondProportion >= lagBasedAutoScalerConfig.getTriggerScaleOutFractionThreshold()) {
+ // Do Scale out
+ int taskCount = currentActiveTaskCount + lagBasedAutoScalerConfig.getScaleOutStep();
+
+ int partitionCount = supervisor.getPartitionCount();
+ if (partitionCount <= 0) {
+ log.warn("Partition number for [%s] <= 0 ? how can it be?", dataSource);
+ return -1;
+ }
+
+ int actualTaskCountMax = Math.min(lagBasedAutoScalerConfig.getTaskCountMax(), partitionCount);
+ if (currentActiveTaskCount == actualTaskCountMax) {
+ log.warn("CurrentActiveTaskCount reached task count Max limit, skipping scale out action for dataSource [%s].",
+ dataSource
+ );
+ return -1;
+ } else {
+ desiredActiveTaskCount = Math.min(taskCount, actualTaskCountMax);
+ }
+ return desiredActiveTaskCount;
+ }
+
+ if (withinProportion >= lagBasedAutoScalerConfig.getTriggerScaleInFractionThreshold()) {
+ // Do Scale in
+ int taskCount = currentActiveTaskCount - lagBasedAutoScalerConfig.getScaleInStep();
+ if (currentActiveTaskCount == lagBasedAutoScalerConfig.getTaskCountMin()) {
+ log.warn("CurrentActiveTaskCount reached task count Min limit, skipping scale in action for dataSource [%s].",
+ dataSource
+ );
+ return -1;
+ } else {
+ desiredActiveTaskCount = Math.max(taskCount, lagBasedAutoScalerConfig.getTaskCountMin());
+ }
+ return desiredActiveTaskCount;
+ }
+ return -1;
+ }
+
+ public LagBasedAutoScalerConfig getAutoScalerConfig()
+ {
+ return lagBasedAutoScalerConfig;
+ }
+}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java
new file mode 100644
index 0000000..0a4eb0b
--- /dev/null
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.indexing.overlord.supervisor.Supervisor;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
+import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
+
+import javax.annotation.Nullable;
+
+public class LagBasedAutoScalerConfig implements AutoScalerConfig
+{
+ private final long lagCollectionIntervalMillis;
+ private final long lagCollectionRangeMillis;
+ private final long scaleActionStartDelayMillis;
+ private final long scaleActionPeriodMillis;
+ private final long scaleOutThreshold;
+ private final long scaleInThreshold;
+ private final double triggerScaleOutFractionThreshold;
+ private final double triggerScaleInFractionThreshold;
+ private int taskCountMax;
+ private int taskCountMin;
+ private final int scaleInStep;
+ private final int scaleOutStep;
+ private final boolean enableTaskAutoScaler;
+ private final long minTriggerScaleActionFrequencyMillis;
+
+ @JsonCreator
+ public LagBasedAutoScalerConfig(
+ @Nullable @JsonProperty("lagCollectionIntervalMillis") Long lagCollectionIntervalMillis,
+ @Nullable @JsonProperty("lagCollectionRangeMillis") Long lagCollectionRangeMillis,
+ @Nullable @JsonProperty("scaleActionStartDelayMillis") Long scaleActionStartDelayMillis,
+ @Nullable @JsonProperty("scaleActionPeriodMillis") Long scaleActionPeriodMillis,
+ @Nullable @JsonProperty("scaleOutThreshold") Long scaleOutThreshold,
+ @Nullable @JsonProperty("scaleInThreshold") Long scaleInThreshold,
+ @Nullable @JsonProperty("triggerScaleOutFractionThreshold") Double triggerScaleOutFractionThreshold,
+ @Nullable @JsonProperty("triggerScaleInFractionThreshold") Double triggerScaleInFractionThreshold,
+ @JsonProperty("taskCountMax") Integer taskCountMax,
+ @JsonProperty("taskCountMin") Integer taskCountMin,
+ @Nullable @JsonProperty("scaleInStep") Integer scaleInStep,
+ @Nullable @JsonProperty("scaleOutStep") Integer scaleOutStep,
+ @Nullable @JsonProperty("enableTaskAutoScaler") Boolean enableTaskAutoScaler,
+ @Nullable @JsonProperty("minTriggerScaleActionFrequencyMillis") Long minTriggerScaleActionFrequencyMillis
+ )
+ {
+ this.enableTaskAutoScaler = enableTaskAutoScaler != null ? enableTaskAutoScaler : false;
+ this.lagCollectionIntervalMillis = lagCollectionIntervalMillis != null ? lagCollectionIntervalMillis : 30000;
+ this.lagCollectionRangeMillis = lagCollectionRangeMillis != null ? lagCollectionRangeMillis : 600000;
+ this.scaleActionStartDelayMillis = scaleActionStartDelayMillis != null ? scaleActionStartDelayMillis : 300000;
+ this.scaleActionPeriodMillis = scaleActionPeriodMillis != null ? scaleActionPeriodMillis : 60000;
+ this.scaleOutThreshold = scaleOutThreshold != null ? scaleOutThreshold : 6000000;
+ this.scaleInThreshold = scaleInThreshold != null ? scaleInThreshold : 1000000;
+ this.triggerScaleOutFractionThreshold = triggerScaleOutFractionThreshold != null ? triggerScaleOutFractionThreshold : 0.3;
+ this.triggerScaleInFractionThreshold = triggerScaleInFractionThreshold != null ? triggerScaleInFractionThreshold : 0.9;
+
+ // Only do taskCountMax and taskCountMin check when autoscaler is enabled. So that users left autoConfig empty{} will not throw any exception and autoscaler is disabled.
+ // If autoscaler is disabled, no matter what configs are set, they are not used.
+ if (this.enableTaskAutoScaler) {
+ if (taskCountMax == null || taskCountMin == null) {
+ throw new RuntimeException("taskCountMax or taskCountMin can't be null!");
+ } else if (taskCountMax < taskCountMin) {
+ throw new RuntimeException("taskCountMax can't lower than taskCountMin!");
+ }
+ this.taskCountMax = taskCountMax;
+ this.taskCountMin = taskCountMin;
+ }
+
+ this.scaleInStep = scaleInStep != null ? scaleInStep : 1;
+ this.scaleOutStep = scaleOutStep != null ? scaleOutStep : 2;
+ this.minTriggerScaleActionFrequencyMillis = minTriggerScaleActionFrequencyMillis
+ != null ? minTriggerScaleActionFrequencyMillis : 600000;
+ }
+
+ @JsonProperty
+ public long getLagCollectionIntervalMillis()
+ {
+ return lagCollectionIntervalMillis;
+ }
+
+ @JsonProperty
+ public long getLagCollectionRangeMillis()
+ {
+ return lagCollectionRangeMillis;
+ }
+
+ @JsonProperty
+ public long getScaleActionStartDelayMillis()
+ {
+ return scaleActionStartDelayMillis;
+ }
+
+ @JsonProperty
+ public long getScaleActionPeriodMillis()
+ {
+ return scaleActionPeriodMillis;
+ }
+
+ @JsonProperty
+ public long getScaleOutThreshold()
+ {
+ return scaleOutThreshold;
+ }
+
+ @JsonProperty
+ public long getScaleInThreshold()
+ {
+ return scaleInThreshold;
+ }
+
+ @JsonProperty
+ public double getTriggerScaleOutFractionThreshold()
+ {
+ return triggerScaleOutFractionThreshold;
+ }
+
+ @JsonProperty
+ public double getTriggerScaleInFractionThreshold()
+ {
+ return triggerScaleInFractionThreshold;
+ }
+
+ @Override
+ @JsonProperty
+ public int getTaskCountMax()
+ {
+ return taskCountMax;
+ }
+
+ @Override
+ @JsonProperty
+ public int getTaskCountMin()
+ {
+ return taskCountMin;
+ }
+
+ @Override
+ public SupervisorTaskAutoScaler createAutoScaler(Supervisor supervisor, SupervisorSpec spec)
+ {
+ return new LagBasedAutoScaler((SeekableStreamSupervisor) supervisor, spec.getId(), this, spec);
+ }
+
+ @JsonProperty
+ public int getScaleInStep()
+ {
+ return scaleInStep;
+ }
+
+ @JsonProperty
+ public int getScaleOutStep()
+ {
+ return scaleOutStep;
+ }
+
+ @Override
+ @JsonProperty
+ public boolean getEnableTaskAutoScaler()
+ {
+ return enableTaskAutoScaler;
+ }
+
+ @Override
+ @JsonProperty
+ public long getMinTriggerScaleActionFrequencyMillis()
+ {
+ return minTriggerScaleActionFrequencyMillis;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "autoScalerConfig{" +
+ "enableTaskAutoScaler=" + enableTaskAutoScaler +
+ ", taskCountMax=" + taskCountMax +
+ ", taskCountMin=" + taskCountMin +
+ ", minTriggerScaleActionFrequencyMillis=" + minTriggerScaleActionFrequencyMillis +
+ ", lagCollectionIntervalMillis=" + lagCollectionIntervalMillis +
+ ", lagCollectionIntervalMillis=" + lagCollectionIntervalMillis +
+ ", scaleOutThreshold=" + scaleOutThreshold +
+ ", triggerScaleOutFractionThreshold=" + triggerScaleOutFractionThreshold +
+ ", scaleInThreshold=" + scaleInThreshold +
+ ", triggerScaleInFractionThreshold=" + triggerScaleInFractionThreshold +
+ ", scaleActionStartDelayMillis=" + scaleActionStartDelayMillis +
+ ", scaleActionPeriodMillis=" + scaleActionPeriodMillis +
+ ", scaleInStep=" + scaleInStep +
+ ", scaleOutStep=" + scaleOutStep +
+ '}';
+ }
+}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/NoopTaskAutoScaler.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/NoopTaskAutoScaler.java
new file mode 100644
index 0000000..9bf41e5
--- /dev/null
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/NoopTaskAutoScaler.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
+
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
+
+public class NoopTaskAutoScaler implements SupervisorTaskAutoScaler
+{
+ public NoopTaskAutoScaler()
+ {
+ }
+
+ @Override
+ public void start()
+ {
+ //Do nothing
+ }
+
+ @Override
+ public void stop()
+ {
+ //Do nothing
+ }
+
+ @Override
+ public void reset()
+ {
+ //Do nothing
+ }
+}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/security/OverlordSecurityResourceFilterTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/security/OverlordSecurityResourceFilterTest.java
index e603a55..3b775ca 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/security/OverlordSecurityResourceFilterTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/security/OverlordSecurityResourceFilterTest.java
@@ -32,6 +32,8 @@ import org.apache.druid.indexing.overlord.supervisor.Supervisor;
import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
import org.apache.druid.indexing.overlord.supervisor.SupervisorResource;
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
+import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.NoopTaskAutoScaler;
import org.apache.druid.indexing.worker.http.WorkerResource;
import org.apache.druid.server.http.security.ResourceFilterTestHelper;
import org.apache.druid.server.security.AuthorizerMapper;
@@ -127,6 +129,12 @@ public class OverlordSecurityResourceFilterTest extends ResourceFilterTestHelper
}
@Override
+ public SupervisorTaskAutoScaler createAutoscaler(Supervisor supervisor)
+ {
+ return new NoopTaskAutoScaler();
+ }
+
+ @Override
public List<String> getDataSources()
{
return ImmutableList.of("test");
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
index 1390e7a..c22460e 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
@@ -26,6 +26,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.TaskMaster;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.server.security.Access;
@@ -1156,6 +1157,12 @@ public class SupervisorResourceTest extends EasyMockSupport
}
@Override
+ public SupervisorTaskAutoScaler createAutoscaler(Supervisor supervisor)
+ {
+ return null;
+ }
+
+ @Override
public List<String> getDataSources()
{
return datasources;
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
new file mode 100644
index 0000000..51a39fc
--- /dev/null
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
@@ -0,0 +1,950 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.data.input.impl.ByteEntity;
+import org.apache.druid.data.input.impl.DimensionSchema;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.JsonInputFormat;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.overlord.DataSourceMetadata;
+import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
+import org.apache.druid.indexing.overlord.TaskMaster;
+import org.apache.druid.indexing.overlord.TaskStorage;
+import org.apache.druid.indexing.overlord.supervisor.Supervisor;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
+import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
+import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
+import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
+import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
+import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIngestionSpec;
+import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorReportPayload;
+import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
+import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorTuningConfig;
+import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
+import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.LagBasedAutoScaler;
+import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.LagBasedAutoScalerConfig;
+import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.NoopTaskAutoScaler;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
+import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig;
+import org.easymock.EasyMock;
+import org.easymock.EasyMockSupport;
+import org.joda.time.DateTime;
+import org.joda.time.Duration;
+import org.joda.time.Period;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ScheduledExecutorService;
+
+public class SeekableStreamSupervisorSpecTest extends EasyMockSupport
+{
+ private SeekableStreamSupervisorIngestionSpec ingestionSchema;
+ private TaskStorage taskStorage;
+ private TaskMaster taskMaster;
+ private IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator;
+ private ServiceEmitter emitter;
+ private RowIngestionMetersFactory rowIngestionMetersFactory;
+ private DataSchema dataSchema;
+ private SeekableStreamSupervisorTuningConfig seekableStreamSupervisorTuningConfig;
+ private SeekableStreamSupervisorIOConfig seekableStreamSupervisorIOConfig;
+ private static final ObjectMapper OBJECT_MAPPER = TestHelper.makeJsonMapper();
+ private SeekableStreamIndexTaskClientFactory taskClientFactory;
+ private static final String STREAM = "stream";
+ private static final String DATASOURCE = "testDS";
+ private SeekableStreamSupervisorSpec spec;
+ private SupervisorStateManagerConfig supervisorConfig;
+
+ private SeekableStreamSupervisor supervisor4;
+
+ private SeekableStreamIndexTaskClientFactory indexTaskClientFactory;
+ private ObjectMapper mapper;
+ private DruidMonitorSchedulerConfig monitorSchedulerConfig;
+ private SupervisorStateManagerConfig supervisorStateManagerConfig;
+
+ @Before
+ public void setUp()
+ {
+ ingestionSchema = EasyMock.mock(SeekableStreamSupervisorIngestionSpec.class);
+ taskStorage = EasyMock.mock(TaskStorage.class);
+ taskMaster = EasyMock.mock(TaskMaster.class);
+ indexerMetadataStorageCoordinator = EasyMock.mock(IndexerMetadataStorageCoordinator.class);
+ emitter = EasyMock.mock(ServiceEmitter.class);
+ rowIngestionMetersFactory = EasyMock.mock(RowIngestionMetersFactory.class);
+ dataSchema = EasyMock.mock(DataSchema.class);
+ seekableStreamSupervisorTuningConfig = EasyMock.mock(SeekableStreamSupervisorTuningConfig.class);
+ seekableStreamSupervisorIOConfig = EasyMock.mock(SeekableStreamSupervisorIOConfig.class);
+ taskClientFactory = EasyMock.mock(SeekableStreamIndexTaskClientFactory.class);
+ spec = EasyMock.mock(SeekableStreamSupervisorSpec.class);
+ supervisorConfig = new SupervisorStateManagerConfig();
+ indexTaskClientFactory = EasyMock.mock(SeekableStreamIndexTaskClientFactory.class);
+ mapper = new DefaultObjectMapper();
+ monitorSchedulerConfig = EasyMock.mock(DruidMonitorSchedulerConfig.class);
+ supervisorStateManagerConfig = EasyMock.mock(SupervisorStateManagerConfig.class);
+ supervisor4 = EasyMock.mock(SeekableStreamSupervisor.class);
+ }
+
+ private abstract class BaseTestSeekableStreamSupervisor extends SeekableStreamSupervisor<String, String, ByteEntity>
+ {
+ private BaseTestSeekableStreamSupervisor()
+ {
+ super(
+ "testSupervisorId",
+ taskStorage,
+ taskMaster,
+ indexerMetadataStorageCoordinator,
+ taskClientFactory,
+ OBJECT_MAPPER,
+ spec,
+ rowIngestionMetersFactory,
+ false
+ );
+ }
+
+ @Override
+ protected String baseTaskName()
+ {
+ return "test";
+ }
+
+ @Override
+ protected void updatePartitionLagFromStream()
+ {
+ // do nothing
+ }
+
+ @Nullable
+ @Override
+ protected Map<String, Long> getPartitionRecordLag()
+ {
+ return null;
+ }
+
+ @Nullable
+ @Override
+ protected Map<String, Long> getPartitionTimeLag()
+ {
+ return null;
+ }
+
+ @Override
+ protected SeekableStreamIndexTaskIOConfig createTaskIoConfig(
+ int groupId,
+ Map<String, String> startPartitions,
+ Map<String, String> endPartitions,
+ String baseSequenceName,
+ DateTime minimumMessageTime,
+ DateTime maximumMessageTime,
+ Set<String> exclusiveStartSequenceNumberPartitions,
+ SeekableStreamSupervisorIOConfig ioConfig
+ )
+ {
+ return new SeekableStreamIndexTaskIOConfig<String, String>(
+ groupId,
+ baseSequenceName,
+ new SeekableStreamStartSequenceNumbers<>(STREAM, startPartitions, exclusiveStartSequenceNumberPartitions),
+ new SeekableStreamEndSequenceNumbers<>(STREAM, endPartitions),
+ true,
+ minimumMessageTime,
+ maximumMessageTime,
+ ioConfig.getInputFormat()
+ )
+ {
+ };
+ }
+
+ @Override
+ protected List<SeekableStreamIndexTask<String, String, ByteEntity>> createIndexTasks(
+ int replicas,
+ String baseSequenceName,
+ ObjectMapper sortingMapper,
+ TreeMap<Integer, Map<String, String>> sequenceOffsets,
+ SeekableStreamIndexTaskIOConfig taskIoConfig,
+ SeekableStreamIndexTaskTuningConfig taskTuningConfig,
+ RowIngestionMetersFactory rowIngestionMetersFactory
+ )
+ {
+ return null;
+ }
+
+ @Override
+ protected int getTaskGroupIdForPartition(String partition)
+ {
+ return 0;
+ }
+
+ @Override
+ protected boolean checkSourceMetadataMatch(DataSourceMetadata metadata)
+ {
+ return true;
+ }
+
+ @Override
+ protected boolean doesTaskTypeMatchSupervisor(Task task)
+ {
+ return true;
+ }
+
+ @Override
+ protected SeekableStreamDataSourceMetadata<String, String> createDataSourceMetaDataForReset(
+ String stream,
+ Map<String, String> map
+ )
+ {
+ return null;
+ }
+
+ @Override
+ protected OrderedSequenceNumber<String> makeSequenceNumber(String seq, boolean isExclusive)
+ {
+ return new OrderedSequenceNumber<String>(seq, isExclusive)
+ {
+ @Override
+ public int compareTo(OrderedSequenceNumber<String> o)
+ {
+ return new BigInteger(this.get()).compareTo(new BigInteger(o.get()));
+ }
+ };
+ }
+
+ @Override
+ protected Map<String, Long> getRecordLagPerPartition(Map<String, String> currentOffsets)
+ {
+ return null;
+ }
+
+ @Override
+ protected Map<String, Long> getTimeLagPerPartition(Map<String, String> currentOffsets)
+ {
+ return null;
+ }
+
+ @Override
+ protected RecordSupplier<String, String, ByteEntity> setupRecordSupplier()
+ {
+ return recordSupplier;
+ }
+
+ @Override
+ protected SeekableStreamSupervisorReportPayload<String, String> createReportPayload(
+ int numPartitions,
+ boolean includeOffsets
+ )
+ {
+ return new SeekableStreamSupervisorReportPayload<String, String>(
+ DATASOURCE,
+ STREAM,
+ 1,
+ 1,
+ 1L,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ false,
+ true,
+ null,
+ null,
+ null
+ )
+ {
+ };
+ }
+
+ @Override
+ protected String getNotSetMarker()
+ {
+ return "NOT_SET";
+ }
+
+ @Override
+ protected String getEndOfPartitionMarker()
+ {
+ return "EOF";
+ }
+
+ @Override
+ protected boolean isEndOfShard(String seqNum)
+ {
+ return false;
+ }
+
+ @Override
+ protected boolean isShardExpirationMarker(String seqNum)
+ {
+ return false;
+ }
+
+ @Override
+ protected boolean useExclusiveStartSequenceNumberForNonFirstSequence()
+ {
+ return false;
+ }
+ }
+
+ private class TestSeekableStreamSupervisor extends BaseTestSeekableStreamSupervisor
+ {
+ private int partitionNumbers;
+
+ public TestSeekableStreamSupervisor(int partitionNumbers)
+ {
+ this.partitionNumbers = partitionNumbers;
+ }
+
+ @Override
+ protected void scheduleReporting(ScheduledExecutorService reportingExec)
+ {
+ // do nothing
+ }
+
+ @Override
+ public LagStats computeLagStats()
+ {
+ return new LagStats(0, 0, 0);
+ }
+
+ @Override
+ public int getPartitionCount()
+ {
+ return partitionNumbers;
+ }
+ }
+
+
+ private static class TestSeekableStreamSupervisorSpec extends SeekableStreamSupervisorSpec
+ {
+ private SeekableStreamSupervisor supervisor;
+ private String id;
+
+ public TestSeekableStreamSupervisorSpec(SeekableStreamSupervisorIngestionSpec ingestionSchema,
+ @Nullable Map<String, Object> context,
+ Boolean suspended,
+ TaskStorage taskStorage,
+ TaskMaster taskMaster,
+ IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator,
+ SeekableStreamIndexTaskClientFactory indexTaskClientFactory,
+ ObjectMapper mapper,
+ ServiceEmitter emitter,
+ DruidMonitorSchedulerConfig monitorSchedulerConfig,
+ RowIngestionMetersFactory rowIngestionMetersFactory,
+ SupervisorStateManagerConfig supervisorStateManagerConfig,
+ SeekableStreamSupervisor supervisor,
+ String id)
+ {
+ super(
+ ingestionSchema,
+ context,
+ suspended,
+ taskStorage,
+ taskMaster,
+ indexerMetadataStorageCoordinator,
+ indexTaskClientFactory,
+ mapper,
+ emitter,
+ monitorSchedulerConfig,
+ rowIngestionMetersFactory,
+ supervisorStateManagerConfig);
+
+ this.supervisor = supervisor;
+ this.id = id;
+ }
+
+ @Override
+ public List<String> getDataSources()
+ {
+ return new ArrayList<>();
+ }
+
+ @Override
+ public String getId()
+ {
+ return id;
+ }
+
+ @Override
+ public Supervisor createSupervisor()
+ {
+ return supervisor;
+ }
+
+ @Override
+ public String getType()
+ {
+ return null;
+ }
+
+ @Override
+ public String getSource()
+ {
+ return null;
+ }
+
+ @Override
+ protected SeekableStreamSupervisorSpec toggleSuspend(boolean suspend)
+ {
+ return null;
+ }
+ }
+
+ private static SeekableStreamSupervisorTuningConfig getTuningConfig()
+ {
+ return new SeekableStreamSupervisorTuningConfig()
+ {
+ @Override
+ public Integer getWorkerThreads()
+ {
+ return 1;
+ }
+
+ @Override
+ public Integer getChatThreads()
+ {
+ return 1;
+ }
+
+ @Override
+ public Long getChatRetries()
+ {
+ return 1L;
+ }
+
+ @Override
+ public Duration getHttpTimeout()
+ {
+ return new Period("PT1M").toStandardDuration();
+ }
+
+ @Override
+ public Duration getShutdownTimeout()
+ {
+ return new Period("PT1S").toStandardDuration();
+ }
+
+ @Override
+ public Duration getRepartitionTransitionDuration()
+ {
+ return new Period("PT2M").toStandardDuration();
+ }
+
+ @Override
+ public Duration getOffsetFetchPeriod()
+ {
+ return new Period("PT5M").toStandardDuration();
+ }
+
+ @Override
+ public SeekableStreamIndexTaskTuningConfig convertToTaskTuningConfig()
+ {
+ return new SeekableStreamIndexTaskTuningConfig(
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null
+ )
+ {
+ @Override
+ public SeekableStreamIndexTaskTuningConfig withBasePersistDirectory(File dir)
+ {
+ return null;
+ }
+
+ @Override
+ public String toString()
+ {
+ return null;
+ }
+ };
+ }
+ };
+ }
+
+ @Test
+ public void testAutoScalerConfig()
+ {
+ AutoScalerConfig autoScalerConfigEmpty = mapper.convertValue(new HashMap<>(), AutoScalerConfig.class);
+ Assert.assertTrue(autoScalerConfigEmpty instanceof LagBasedAutoScalerConfig);
+ Assert.assertFalse(autoScalerConfigEmpty.getEnableTaskAutoScaler());
+
+ AutoScalerConfig autoScalerConfigNull = mapper.convertValue(null, AutoScalerConfig.class);
+ Assert.assertNull(autoScalerConfigNull);
+
+ AutoScalerConfig autoScalerConfigDefault = mapper.convertValue(ImmutableMap.of("autoScalerStrategy", "lagBased"), AutoScalerConfig.class);
+ Assert.assertTrue(autoScalerConfigDefault instanceof LagBasedAutoScalerConfig);
+
+ AutoScalerConfig autoScalerConfigValue = mapper.convertValue(ImmutableMap.of("lagCollectionIntervalMillis", "1"), AutoScalerConfig.class);
+ Assert.assertTrue(autoScalerConfigValue instanceof LagBasedAutoScalerConfig);
+ LagBasedAutoScalerConfig lagBasedAutoScalerConfig = (LagBasedAutoScalerConfig) autoScalerConfigValue;
+ Assert.assertEquals(lagBasedAutoScalerConfig.getLagCollectionIntervalMillis(), 1);
+
+ Exception e = null;
+ try {
+ AutoScalerConfig autoScalerError = mapper.convertValue(ImmutableMap.of("enableTaskAutoScaler", "true", "taskCountMax", "1", "taskCountMin", "4"), AutoScalerConfig.class);
+ }
+ catch (RuntimeException ex) {
+ e = ex;
+ }
+ Assert.assertNotNull(e);
+
+ e = null;
+ try {
+ // taskCountMax and taskCountMin couldn't be ignored.
+ AutoScalerConfig autoScalerError2 = mapper.convertValue(ImmutableMap.of("enableTaskAutoScaler", "true"), AutoScalerConfig.class);
+ }
+ catch (RuntimeException ex) {
+ e = ex;
+ }
+ Assert.assertNotNull(e);
+
+
+ }
+
+ @Test
+ public void testAutoScalerCreated()
+ {
+ HashMap<String, Object> autoScalerConfig = new HashMap<>();
+ autoScalerConfig.put("enableTaskAutoScaler", true);
+ autoScalerConfig.put("lagCollectionIntervalMillis", 500);
+ autoScalerConfig.put("lagCollectionRangeMillis", 500);
+ autoScalerConfig.put("scaleOutThreshold", 5000000);
+ autoScalerConfig.put("triggerScaleOutFractionThreshold", 0.3);
+ autoScalerConfig.put("scaleInThreshold", 1000000);
+ autoScalerConfig.put("triggerScaleInFractionThreshold", 0.8);
+ autoScalerConfig.put("scaleActionStartDelayMillis", 0);
+ autoScalerConfig.put("scaleActionPeriodMillis", 100);
+ autoScalerConfig.put("taskCountMax", 8);
+ autoScalerConfig.put("taskCountMin", 1);
+ autoScalerConfig.put("scaleInStep", 1);
+ autoScalerConfig.put("scaleOutStep", 2);
+ autoScalerConfig.put("minTriggerScaleActionFrequencyMillis", 1200000);
+
+ EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
+ EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
+ EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes();
+ EasyMock.replay(ingestionSchema);
+
+ EasyMock.expect(seekableStreamSupervisorIOConfig.getAutoscalerConfig()).andReturn(mapper.convertValue(autoScalerConfig, AutoScalerConfig.class)).anyTimes();
+ EasyMock.replay(seekableStreamSupervisorIOConfig);
+
+ EasyMock.expect(supervisor4.getActiveTaskGroupsCount()).andReturn(0).anyTimes();
+ EasyMock.replay(supervisor4);
+
+ TestSeekableStreamSupervisorSpec spec = new TestSeekableStreamSupervisorSpec(ingestionSchema,
+ null,
+ false,
+ taskStorage,
+ taskMaster,
+ indexerMetadataStorageCoordinator,
+ indexTaskClientFactory,
+ mapper,
+ emitter,
+ monitorSchedulerConfig,
+ rowIngestionMetersFactory,
+ supervisorStateManagerConfig,
+ supervisor4,
+ "id1");
+ SupervisorTaskAutoScaler autoscaler = spec.createAutoscaler(supervisor4);
+ Assert.assertTrue(autoscaler instanceof LagBasedAutoScaler);
+
+ EasyMock.reset(seekableStreamSupervisorIOConfig);
+ autoScalerConfig.put("enableTaskAutoScaler", false);
+ EasyMock.expect(seekableStreamSupervisorIOConfig.getAutoscalerConfig()).andReturn(mapper.convertValue(autoScalerConfig, AutoScalerConfig.class)).anyTimes();
+ EasyMock.replay(seekableStreamSupervisorIOConfig);
+ SupervisorTaskAutoScaler autoscaler2 = spec.createAutoscaler(supervisor4);
+ Assert.assertTrue(autoscaler2 instanceof NoopTaskAutoScaler);
+
+ EasyMock.reset(seekableStreamSupervisorIOConfig);
+ autoScalerConfig.remove("enableTaskAutoScaler");
+ EasyMock.expect(seekableStreamSupervisorIOConfig.getAutoscalerConfig()).andReturn(mapper.convertValue(autoScalerConfig, AutoScalerConfig.class)).anyTimes();
+ EasyMock.replay(seekableStreamSupervisorIOConfig);
+ SupervisorTaskAutoScaler autoscaler3 = spec.createAutoscaler(supervisor4);
+ Assert.assertTrue(autoscaler3 instanceof NoopTaskAutoScaler);
+
+ EasyMock.reset(seekableStreamSupervisorIOConfig);
+ autoScalerConfig.clear();
+ EasyMock.expect(seekableStreamSupervisorIOConfig.getAutoscalerConfig()).andReturn(mapper.convertValue(autoScalerConfig, AutoScalerConfig.class)).anyTimes();
+ EasyMock.replay(seekableStreamSupervisorIOConfig);
+ Assert.assertTrue(autoScalerConfig.isEmpty());
+ SupervisorTaskAutoScaler autoscaler4 = spec.createAutoscaler(supervisor4);
+ Assert.assertTrue(autoscaler4 instanceof NoopTaskAutoScaler);
+
+ }
+
+ @Test
+ public void testDefaultAutoScalerConfigCreatedWithDefault()
+ {
+ EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
+ EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
+ EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes();
+ EasyMock.replay(ingestionSchema);
+
+ EasyMock.expect(seekableStreamSupervisorIOConfig.getAutoscalerConfig()).andReturn(mapper.convertValue(ImmutableMap.of("lagCollectionIntervalMillis", "1", "enableTaskAutoScaler", true, "taskCountMax", "4", "taskCountMin", "1"), AutoScalerConfig.class)).anyTimes();
+ EasyMock.replay(seekableStreamSupervisorIOConfig);
+
+ EasyMock.expect(supervisor4.getActiveTaskGroupsCount()).andReturn(0).anyTimes();
+ EasyMock.replay(supervisor4);
+
+ TestSeekableStreamSupervisorSpec spec = new TestSeekableStreamSupervisorSpec(ingestionSchema,
+ null,
+ false,
+ taskStorage,
+ taskMaster,
+ indexerMetadataStorageCoordinator,
+ indexTaskClientFactory,
+ mapper,
+ emitter,
+ monitorSchedulerConfig,
+ rowIngestionMetersFactory,
+ supervisorStateManagerConfig,
+ supervisor4,
+ "id1");
+ SupervisorTaskAutoScaler autoscaler = spec.createAutoscaler(supervisor4);
+ Assert.assertTrue(autoscaler instanceof LagBasedAutoScaler);
+ LagBasedAutoScaler lagBasedAutoScaler = (LagBasedAutoScaler) autoscaler;
+ LagBasedAutoScalerConfig lagBasedAutoScalerConfig = lagBasedAutoScaler.getAutoScalerConfig();
+ Assert.assertEquals(lagBasedAutoScalerConfig.getLagCollectionIntervalMillis(), 1);
+ Assert.assertEquals(lagBasedAutoScalerConfig.getLagCollectionRangeMillis(), 600000);
+ Assert.assertEquals(lagBasedAutoScalerConfig.getScaleActionStartDelayMillis(), 300000);
+ Assert.assertEquals(lagBasedAutoScalerConfig.getScaleActionPeriodMillis(), 60000);
+ Assert.assertEquals(lagBasedAutoScalerConfig.getScaleOutThreshold(), 6000000);
+ Assert.assertEquals(lagBasedAutoScalerConfig.getScaleInThreshold(), 1000000);
+ Assert.assertEquals(lagBasedAutoScalerConfig.getTaskCountMax(), 4);
+ Assert.assertEquals(lagBasedAutoScalerConfig.getTaskCountMin(), 1);
+ Assert.assertEquals(lagBasedAutoScalerConfig.getScaleInStep(), 1);
+ Assert.assertEquals(lagBasedAutoScalerConfig.getScaleOutStep(), 2);
+ Assert.assertEquals(lagBasedAutoScalerConfig.getMinTriggerScaleActionFrequencyMillis(), 600000);
+ }
+
+ @Test
+ public void testSeekableStreamSupervisorSpecWithScaleOut() throws InterruptedException
+ {
+
+ EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
+
+ EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
+ EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(1, true)).anyTimes();
+ EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
+ EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
+ EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
+ EasyMock.replay(spec);
+
+ EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
+ EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
+ EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes();
+ EasyMock.replay(ingestionSchema);
+
+ EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
+ EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()).anyTimes();
+ EasyMock.replay(taskMaster);
+
+ TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(3);
+
+ LagStats lagStats = supervisor.computeLagStats();
+
+ LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(supervisor, DATASOURCE, mapper.convertValue(getScaleOutProperties(2), LagBasedAutoScalerConfig.class), spec);
+ supervisor.start();
+ autoScaler.start();
+ supervisor.runInternal();
+ int taskCountBeforeScaleOut = supervisor.getIoConfig().getTaskCount();
+ Assert.assertEquals(1, taskCountBeforeScaleOut);
+ Thread.sleep(1 * 1000);
+ int taskCountAfterScaleOut = supervisor.getIoConfig().getTaskCount();
+ Assert.assertEquals(2, taskCountAfterScaleOut);
+
+ autoScaler.reset();
+ autoScaler.stop();
+
+ }
+
+ @Test
+ public void testSeekableStreamSupervisorSpecWithScaleOutSmallPartitionNumber() throws InterruptedException
+ {
+
+ EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
+
+ EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
+ EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(1, true)).anyTimes();
+ EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
+ EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
+ EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
+ EasyMock.replay(spec);
+
+ EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
+ EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
+ EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes();
+ EasyMock.replay(ingestionSchema);
+
+ EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
+ EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()).anyTimes();
+ EasyMock.replay(taskMaster);
+
+ TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(2);
+
+ LagStats lagStats = supervisor.computeLagStats();
+
+ LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(supervisor, DATASOURCE, mapper.convertValue(getScaleOutProperties(3), LagBasedAutoScalerConfig.class), spec);
+ supervisor.start();
+ autoScaler.start();
+ supervisor.runInternal();
+ int taskCountBeforeScaleOut = supervisor.getIoConfig().getTaskCount();
+ Assert.assertEquals(1, taskCountBeforeScaleOut);
+ Thread.sleep(1 * 1000);
+ int taskCountAfterScaleOut = supervisor.getIoConfig().getTaskCount();
+ Assert.assertEquals(2, taskCountAfterScaleOut);
+
+ autoScaler.reset();
+ autoScaler.stop();
+
+ }
+
+ @Test
+ public void testSeekableStreamSupervisorSpecWithScaleIn() throws InterruptedException
+ {
+
+ EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
+
+ EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
+ EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(2, false)).anyTimes();
+ EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
+ EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
+ EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
+ EasyMock.replay(spec);
+
+ EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
+ EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
+ EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes();
+ EasyMock.replay(ingestionSchema);
+
+ EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
+ EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()).anyTimes();
+ EasyMock.replay(taskMaster);
+
+ TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(3);
+ LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(supervisor, DATASOURCE, mapper.convertValue(getScaleInProperties(), LagBasedAutoScalerConfig.class), spec);
+
+ // enable autoscaler so that taskcount config will be ignored and init value of taskCount will use taskCountMin.
+ Assert.assertEquals(1, (int) supervisor.getIoConfig().getTaskCount());
+ supervisor.getIoConfig().setTaskCount(2);
+ supervisor.start();
+ autoScaler.start();
+ supervisor.runInternal();
+ int taskCountBeforeScaleOut = supervisor.getIoConfig().getTaskCount();
+ Assert.assertEquals(2, taskCountBeforeScaleOut);
+ Thread.sleep(1 * 1000);
+ int taskCountAfterScaleOut = supervisor.getIoConfig().getTaskCount();
+ Assert.assertEquals(1, taskCountAfterScaleOut);
+
+ autoScaler.reset();
+ autoScaler.stop();
+ }
+
+ @Test
+ public void testSeekableStreamSupervisorSpecWithScaleDisable() throws InterruptedException
+ {
+
+ SeekableStreamSupervisorIOConfig seekableStreamSupervisorIOConfig = new SeekableStreamSupervisorIOConfig(
+ "stream",
+ new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false),
+ 1,
+ 1,
+ new Period("PT1H"),
+ new Period("P1D"),
+ new Period("PT30S"),
+ false,
+ new Period("PT30M"),
+ null,
+ null, null, null
+ ) {};
+ EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
+
+ EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
+ EasyMock.expect(spec.getIoConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
+ EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
+ EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
+ EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
+ EasyMock.replay(spec);
+
+ EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(this.seekableStreamSupervisorIOConfig).anyTimes();
+ EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
+ EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes();
+ EasyMock.replay(ingestionSchema);
+
+ EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
+ EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()).anyTimes();
+ EasyMock.replay(taskMaster);
+
+ TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(3);
+ NoopTaskAutoScaler autoScaler = new NoopTaskAutoScaler();
+ supervisor.start();
+ autoScaler.start();
+ supervisor.runInternal();
+ int taskCountBeforeScaleOut = supervisor.getIoConfig().getTaskCount();
+ Assert.assertEquals(1, taskCountBeforeScaleOut);
+ Thread.sleep(1 * 1000);
+ int taskCountAfterScaleOut = supervisor.getIoConfig().getTaskCount();
+ Assert.assertEquals(1, taskCountAfterScaleOut);
+
+ autoScaler.reset();
+ autoScaler.stop();
+ }
+
+ private static DataSchema getDataSchema()
+ {
+ List<DimensionSchema> dimensions = new ArrayList<>();
+ dimensions.add(StringDimensionSchema.create("dim1"));
+ dimensions.add(StringDimensionSchema.create("dim2"));
+
+ return new DataSchema(
+ DATASOURCE,
+ new TimestampSpec("timestamp", "iso", null),
+ new DimensionsSpec(
+ dimensions,
+ null,
+ null
+ ),
+ new AggregatorFactory[]{new CountAggregatorFactory("rows")},
+ new UniformGranularitySpec(
+ Granularities.HOUR,
+ Granularities.NONE,
+ ImmutableList.of()
+ ),
+ null
+ );
+ }
+
+ private SeekableStreamSupervisorIOConfig getIOConfig(int taskCount, boolean scaleOut)
+ {
+ if (scaleOut) {
+ return new SeekableStreamSupervisorIOConfig(
+ "stream",
+ new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false),
+ 1,
+ taskCount,
+ new Period("PT1H"),
+ new Period("P1D"),
+ new Period("PT30S"),
+ false,
+ new Period("PT30M"),
+ null,
+ null, mapper.convertValue(getScaleOutProperties(2), AutoScalerConfig.class), null
+ ) {};
+ } else {
+ return new SeekableStreamSupervisorIOConfig(
+ "stream",
+ new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false),
+ 1,
+ taskCount,
+ new Period("PT1H"),
+ new Period("P1D"),
+ new Period("PT30S"),
+ false,
+ new Period("PT30M"),
+ null,
+ null, mapper.convertValue(getScaleInProperties(), AutoScalerConfig.class), null
+ ) {};
+ }
+ }
+
+ private static Map<String, Object> getScaleOutProperties(int maxTaskCount)
+ {
+ HashMap<String, Object> autoScalerConfig = new HashMap<>();
+ autoScalerConfig.put("enableTaskAutoScaler", true);
+ autoScalerConfig.put("lagCollectionIntervalMillis", 500);
+ autoScalerConfig.put("lagCollectionRangeMillis", 500);
+ autoScalerConfig.put("scaleOutThreshold", 0);
+ autoScalerConfig.put("triggerScaleOutFractionThreshold", 0.0);
+ autoScalerConfig.put("scaleInThreshold", 1000000);
+ autoScalerConfig.put("triggerScaleInFractionThreshold", 0.8);
+ autoScalerConfig.put("scaleActionStartDelayMillis", 0);
+ autoScalerConfig.put("scaleActionPeriodMillis", 100);
+ autoScalerConfig.put("taskCountMax", maxTaskCount);
+ autoScalerConfig.put("taskCountMin", 1);
+ autoScalerConfig.put("scaleInStep", 1);
+ autoScalerConfig.put("scaleOutStep", 2);
+ autoScalerConfig.put("minTriggerScaleActionFrequencyMillis", 1200000);
+ return autoScalerConfig;
+ }
+
+ private static Map<String, Object> getScaleInProperties()
+ {
+ HashMap<String, Object> autoScalerConfig = new HashMap<>();
+ autoScalerConfig.put("enableTaskAutoScaler", true);
+ autoScalerConfig.put("lagCollectionIntervalMillis", 500);
+ autoScalerConfig.put("lagCollectionRangeMillis", 500);
+ autoScalerConfig.put("scaleOutThreshold", 8000000);
+ autoScalerConfig.put("triggerScaleOutFractionThreshold", 0.3);
+ autoScalerConfig.put("scaleInThreshold", 0);
+ autoScalerConfig.put("triggerScaleInFractionThreshold", 0.0);
+ autoScalerConfig.put("scaleActionStartDelayMillis", 0);
+ autoScalerConfig.put("scaleActionPeriodMillis", 100);
+ autoScalerConfig.put("taskCountMax", 2);
+ autoScalerConfig.put("taskCountMin", 1);
+ autoScalerConfig.put("scaleInStep", 1);
+ autoScalerConfig.put("scaleOutStep", 2);
+ autoScalerConfig.put("minTriggerScaleActionFrequencyMillis", 1200000);
+ return autoScalerConfig;
+ }
+
+}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
index 6adf4c8..42e8c55 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
@@ -43,6 +43,7 @@ import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager.BasicState;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
@@ -58,6 +59,7 @@ import org.apache.druid.indexing.seekablestream.common.StreamException;
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManager.SeekableStreamExceptionEvent;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManager.SeekableStreamState;
+import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
@@ -82,9 +84,11 @@ import org.junit.Before;
import org.junit.Test;
import javax.annotation.Nullable;
+
import java.io.File;
import java.math.BigInteger;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -765,6 +769,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
new Period("PT30M"),
null,
null,
+ null,
null
)
{
@@ -825,12 +830,32 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
false,
new Period("PT30M"),
null,
- null, null
+ null, OBJECT_MAPPER.convertValue(getProperties(), AutoScalerConfig.class), null
)
{
};
}
+ private static Map<String, Object> getProperties()
+ {
+ HashMap<String, Object> autoScalerConfig = new HashMap<>();
+ autoScalerConfig.put("enableTaskAutoScaler", true);
+ autoScalerConfig.put("lagCollectionIntervalMillis", 500);
+ autoScalerConfig.put("lagCollectionRangeMillis", 500);
+ autoScalerConfig.put("scaleOutThreshold", 5000000);
+ autoScalerConfig.put("triggerScaleOutFractionThreshold", 0.3);
+ autoScalerConfig.put("scaleInThreshold", 1000000);
+ autoScalerConfig.put("triggerScaleInFractionThreshold", 0.8);
+ autoScalerConfig.put("scaleActionStartDelayMillis", 0);
+ autoScalerConfig.put("scaleActionPeriodMillis", 100);
+ autoScalerConfig.put("taskCountMax", 8);
+ autoScalerConfig.put("taskCountMin", 1);
+ autoScalerConfig.put("scaleInStep", 1);
+ autoScalerConfig.put("scaleOutStep", 2);
+ autoScalerConfig.put("minTriggerScaleActionFrequencyMillis", 1200000);
+ return autoScalerConfig;
+ }
+
private static SeekableStreamSupervisorTuningConfig getTuningConfig()
{
return new SeekableStreamSupervisorTuningConfig()
@@ -1177,6 +1202,12 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
{
// do nothing
}
+
+ @Override
+ public LagStats computeLagStats()
+ {
+ return null;
+ }
}
private class TestEmittingTestSeekableStreamSupervisor extends BaseTestSeekableStreamSupervisor
@@ -1220,6 +1251,12 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
}
@Override
+ public LagStats computeLagStats()
+ {
+ return null;
+ }
+
+ @Override
protected void scheduleReporting(ScheduledExecutorService reportingExec)
{
SeekableStreamSupervisorIOConfig ioConfig = spec.getIoConfig();
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
index 5c2a18e..c38b67a 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
@@ -74,10 +74,14 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
private static final String QUERIES_FILE = "/stream/queries/stream_index_queries.json";
private static final String SUPERVISOR_SPEC_TEMPLATE_FILE = "supervisor_spec_template.json";
+ private static final String SUPERVISOR_WITH_AUTOSCALER_SPEC_TEMPLATE_FILE = "supervisor_with_autoscaler_spec_template.json";
protected static final String DATA_RESOURCE_ROOT = "/stream/data";
protected static final String SUPERVISOR_SPEC_TEMPLATE_PATH =
String.join("/", DATA_RESOURCE_ROOT, SUPERVISOR_SPEC_TEMPLATE_FILE);
+ protected static final String SUPERVISOR_WITH_AUTOSCALER_SPEC_TEMPLATE_PATH =
+ String.join("/", DATA_RESOURCE_ROOT, SUPERVISOR_WITH_AUTOSCALER_SPEC_TEMPLATE_FILE);
+
protected static final String SERIALIZER_SPEC_DIR = "serializer";
protected static final String INPUT_FORMAT_SPEC_DIR = "input_format";
protected static final String INPUT_ROW_PARSER_SPEC_DIR = "parser";
@@ -294,6 +298,70 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
}
}
+ protected void doTestIndexDataWithAutoscaler(@Nullable Boolean transactionEnabled) throws Exception
+ {
+ final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig(
+ INPUT_FORMAT,
+ getResourceAsString(JSON_INPUT_FORMAT_PATH)
+ );
+ try (
+ final Closeable closer = createResourceCloser(generatedTestConfig);
+ final StreamEventWriter streamEventWriter = createStreamEventWriter(config, transactionEnabled)
+ ) {
+ final String taskSpec = generatedTestConfig.getStreamIngestionPropsTransform()
+ .apply(getResourceAsString(SUPERVISOR_WITH_AUTOSCALER_SPEC_TEMPLATE_PATH));
+ LOG.info("supervisorSpec: [%s]\n", taskSpec);
+ // Start supervisor
+ generatedTestConfig.setSupervisorId(indexer.submitSupervisor(taskSpec));
+ LOG.info("Submitted supervisor");
+ // Start generating half of the data
+ int secondsToGenerateRemaining = TOTAL_NUMBER_OF_SECOND;
+ int secondsToGenerateFirstRound = TOTAL_NUMBER_OF_SECOND / 2;
+ secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateFirstRound;
+ final StreamGenerator streamGenerator = new WikipediaStreamEventStreamGenerator(
+ new JsonEventSerializer(jsonMapper),
+ EVENTS_PER_SECOND,
+ CYCLE_PADDING_MS
+ );
+ long numWritten = streamGenerator.run(
+ generatedTestConfig.getStreamName(),
+ streamEventWriter,
+ secondsToGenerateFirstRound,
+ FIRST_EVENT_TIME
+ );
+ // Verify supervisor is healthy before suspension
+ ITRetryUtil.retryUntil(
+ () -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(generatedTestConfig.getSupervisorId())),
+ true,
+ 10000,
+ 30,
+ "Waiting for supervisor to be healthy"
+ );
+
+ // wait for autoScaling task numbers from 1 to 2.
+ ITRetryUtil.retryUntil(
+ () -> indexer.getRunningTasks().size() == 2,
+ true,
+ 10000,
+ 50,
+ "waiting for autoScaling task numbers from 1 to 2"
+ );
+
+ // Start generating remainning half of the data
+ numWritten += streamGenerator.run(
+ generatedTestConfig.getStreamName(),
+ streamEventWriter,
+ secondsToGenerateRemaining,
+ FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound)
+ );
+
+ // Verify that supervisor can catch up with the stream
+ verifyIngestedData(generatedTestConfig, numWritten);
+ }
+ }
+
+
+
protected void doTestIndexDataWithStreamReshardSplit(@Nullable Boolean transactionEnabled) throws Exception
{
// Reshard the stream from STREAM_SHARD_COUNT to STREAM_SHARD_COUNT * 2
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceNonTransactionalParallelizedTest.java b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceNonTransactionalParallelizedTest.java
index 2c648ea..967ff52 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceNonTransactionalParallelizedTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceNonTransactionalParallelizedTest.java
@@ -57,6 +57,16 @@ public class ITKafkaIndexingServiceNonTransactionalParallelizedTest extends Abst
* and supervisor maintained and scoped within this test only
*/
@Test
+ public void testKafkaIndexDataWithWithAutoscaler() throws Exception
+ {
+ doTestIndexDataWithAutoscaler(false);
+ }
+
+ /**
+ * This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource
+ * and supervisor maintained and scoped within this test only
+ */
+ @Test
public void testKafkaIndexDataWithKafkaReshardSplit() throws Exception
{
doTestIndexDataWithStreamReshardSplit(false);
diff --git a/integration-tests/src/test/resources/stream/data/supervisor_with_autoscaler_spec_template.json b/integration-tests/src/test/resources/stream/data/supervisor_with_autoscaler_spec_template.json
new file mode 100644
index 0000000..f2fa828
--- /dev/null
+++ b/integration-tests/src/test/resources/stream/data/supervisor_with_autoscaler_spec_template.json
@@ -0,0 +1,73 @@
+{
+ "type": "%%STREAM_TYPE%%",
+ "dataSchema": {
+ "dataSource": "%%DATASOURCE%%",
+ "parser": %%PARSER%%,
+ "timestampSpec": {
+ "column": "timestamp",
+ "format": "auto"
+ },
+ "dimensionsSpec": {
+ "dimensions": ["page", "language", "user", "unpatrolled", "newPage", "robot", "anonymous", "namespace", "continent", "country", "region", "city"],
+ "dimensionExclusions": [],
+ "spatialDimensions": []
+ },
+ "metricsSpec": [
+ {
+ "type": "count",
+ "name": "count"
+ },
+ {
+ "type": "doubleSum",
+ "name": "added",
+ "fieldName": "added"
+ },
+ {
+ "type": "doubleSum",
+ "name": "deleted",
+ "fieldName": "deleted"
+ },
+ {
+ "type": "doubleSum",
+ "name": "delta",
+ "fieldName": "delta"
+ }
+ ],
+ "granularitySpec": {
+ "type": "uniform",
+ "segmentGranularity": "MINUTE",
+ "queryGranularity": "NONE"
+ }
+ },
+ "tuningConfig": {
+ "type": "%%STREAM_TYPE%%",
+ "intermediatePersistPeriod": "PT30S",
+ "maxRowsPerSegment": 5000000,
+ "maxRowsInMemory": 500000
+ },
+ "ioConfig": {
+ "%%TOPIC_KEY%%": "%%TOPIC_VALUE%%",
+ "%%STREAM_PROPERTIES_KEY%%": %%STREAM_PROPERTIES_VALUE%%,
+ "autoScalerConfig": {
+ "enableTaskAutoScaler": true,
+ "lagCollectionIntervalMillis": 500,
+ "lagCollectionRangeMillis": 500,
+ "scaleOutThreshold": 0,
+ "triggerScaleOutFractionThreshold": 0.0,
+ "scaleInThreshold": 1000000,
+ "triggerScaleInFractionThreshold": 0.9,
+ "scaleActionStartDelayMillis": 0,
+ "scaleActionPeriodMillis": 100,
+ "taskCountMax": 2,
+ "taskCountMin": 1,
+ "scaleInStep": 1,
+ "scaleOutStep": 2,
+ "minTriggerScaleActionFrequencyMillis": 600000
+ },
+ "taskCount": 1,
+ "replicas": 1,
+ "taskDuration": "PT30S",
+ "%%USE_EARLIEST_KEY%%": true,
+ "inputFormat" : %%INPUT_FORMAT%%
+ }
+}
diff --git a/pom.xml b/pom.xml
index 24f6993..157aec2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -958,6 +958,11 @@
<version>4.5.1</version>
</dependency>
<dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-collections4</artifactId>
+ <version>4.2</version>
+ </dependency>
+ <dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>${dropwizard.metrics.version}</version>
diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
index a2aa29f..b19aeaa 100644
--- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
+++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import javax.annotation.Nullable;
import java.util.ArrayList;
@@ -154,6 +155,18 @@ public class NoopSupervisorSpec implements SupervisorSpec
{
}
+
+ @Override
+ public LagStats computeLagStats()
+ {
+ return new LagStats(0, 0, 0);
+ }
+
+ @Override
+ public int getActiveTaskGroupsCount()
+ {
+ return -1;
+ }
};
}
diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
index 83c661a..66d1139 100644
--- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
+++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
@@ -21,6 +21,7 @@ package org.apache.druid.indexing.overlord.supervisor;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import javax.annotation.Nullable;
import java.util.Map;
@@ -64,4 +65,12 @@ public interface Supervisor
* @param checkpointMetadata metadata for the sequence to currently checkpoint
*/
void checkpoint(int taskGroupId, DataSourceMetadata checkpointMetadata);
+
+ /**
+ * Computes maxLag, totalLag and avgLag
+ * Only supports Kafka ingestion so far.
+ */
+ LagStats computeLagStats();
+
+ int getActiveTaskGroupsCount();
}
diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java
index 041156c..9b44cd0 100644
--- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java
+++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java
@@ -21,6 +21,7 @@ package org.apache.druid.indexing.overlord.supervisor;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
import java.util.List;
@@ -40,6 +41,11 @@ public interface SupervisorSpec
*/
Supervisor createSupervisor();
+ default SupervisorTaskAutoScaler createAutoscaler(Supervisor supervisor)
+ {
+ return null;
+ }
+
List<String> getDataSources();
default SupervisorSpec createSuspendedSpec()
diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagStats.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagStats.java
new file mode 100644
index 0000000..7b6e5fd
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagStats.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord.supervisor.autoscaler;
+
+public class LagStats
+{
+ private final long maxLag;
+ private final long totalLag;
+ private final long avgLag;
+
+ public LagStats(long maxLag, long totalLag, long avgLag)
+ {
+ this.maxLag = maxLag;
+ this.totalLag = totalLag;
+ this.avgLag = avgLag;
+ }
+
+ public long getMaxLag()
+ {
+ return maxLag;
+ }
+
+ public long getTotalLag()
+ {
+ return totalLag;
+ }
+
+ public long getAvgLag()
+ {
+ return avgLag;
+ }
+}
diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/SupervisorTaskAutoScaler.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/SupervisorTaskAutoScaler.java
new file mode 100644
index 0000000..c921e27
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/SupervisorTaskAutoScaler.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord.supervisor.autoscaler;
+
+public interface SupervisorTaskAutoScaler
+{
+ void start();
+ void stop();
+ void reset();
+}
diff --git a/server/src/test/java/org/apache/druid/indexing/NoopSupervisorSpecTest.java b/server/src/test/java/org/apache/druid/indexing/NoopSupervisorSpecTest.java
new file mode 100644
index 0000000..fd5fac0
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/indexing/NoopSupervisorSpecTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing;
+
+import org.apache.druid.indexing.overlord.supervisor.NoopSupervisorSpec;
+import org.apache.druid.indexing.overlord.supervisor.Supervisor;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.concurrent.Callable;
+
+public class NoopSupervisorSpecTest
+{
+ @Test
+ public void testNoopSupervisorSpecWithAutoscaler()
+ {
+ Exception e = null;
+ try {
+ NoopSupervisorSpec noopSupervisorSpec = new NoopSupervisorSpec(null, Collections.singletonList("datasource1"));
+ Supervisor supervisor = noopSupervisorSpec.createSupervisor();
+ SupervisorTaskAutoScaler autoscaler = noopSupervisorSpec.createAutoscaler(supervisor);
+ Assert.assertNull(autoscaler);
+ Callable<Integer> noop = new Callable<Integer>() {
+ @Override
+ public Integer call()
+ {
+ return -1;
+ }
+ };
+
+ int count = supervisor.getActiveTaskGroupsCount();
+ Assert.assertEquals(count, -1);
+
+ LagStats lagStats = supervisor.computeLagStats();
+ long totalLag = lagStats.getTotalLag();
+ long avgLag = lagStats.getAvgLag();
+ long maxLag = lagStats.getMaxLag();
+ Assert.assertEquals(totalLag, 0);
+ Assert.assertEquals(avgLag, 0);
+ Assert.assertEquals(maxLag, 0);
+ }
+ catch (Exception ex) {
+ e = ex;
+ }
+ Assert.assertNull(e);
+ }
+}
diff --git a/server/src/test/java/org/apache/druid/metadata/SQLMetadataSupervisorManagerTest.java b/server/src/test/java/org/apache/druid/metadata/SQLMetadataSupervisorManagerTest.java
index 9332220..5c40757 100644
--- a/server/src/test/java/org/apache/druid/metadata/SQLMetadataSupervisorManagerTest.java
+++ b/server/src/test/java/org/apache/druid/metadata/SQLMetadataSupervisorManagerTest.java
@@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableMap;
import org.apache.druid.indexing.overlord.supervisor.Supervisor;
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
import org.apache.druid.indexing.overlord.supervisor.VersionedSupervisorSpec;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.StringUtils;
import org.junit.After;
@@ -186,6 +187,12 @@ public class SQLMetadataSupervisorManagerTest
}
@Override
+ public SupervisorTaskAutoScaler createAutoscaler(Supervisor supervisor)
+ {
+ return null;
+ }
+
+ @Override
public List<String> getDataSources()
{
return Collections.singletonList(dataSource);
diff --git a/server/src/test/java/org/apache/druid/metadata/TestSupervisorSpec.java b/server/src/test/java/org/apache/druid/metadata/TestSupervisorSpec.java
index e935a41..ffacfa2 100644
--- a/server/src/test/java/org/apache/druid/metadata/TestSupervisorSpec.java
+++ b/server/src/test/java/org/apache/druid/metadata/TestSupervisorSpec.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.indexing.overlord.supervisor.Supervisor;
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
import java.util.List;
import java.util.Objects;
@@ -53,6 +54,12 @@ public class TestSupervisorSpec implements SupervisorSpec
}
@Override
+ public SupervisorTaskAutoScaler createAutoscaler(Supervisor supervisor)
+ {
+ return null;
+ }
+
+ @Override
public List<String> getDataSources()
{
return null;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org