You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by pj...@apache.org on 2021/03/06 09:07:25 UTC

[druid] branch master updated: Dynamic auto scale Kafka-Stream ingest tasks (#10524)

This is an automated email from the ASF dual-hosted git repository.

pjain1 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new bddacbb  Dynamic auto scale Kafka-Stream ingest tasks (#10524)
bddacbb is described below

commit bddacbb1c3abccf6ad035a4756a6960761fd43a2
Author: zhangyue19921010 <69...@users.noreply.github.com>
AuthorDate: Sat Mar 6 17:06:52 2021 +0800

    Dynamic auto scale Kafka-Stream ingest tasks (#10524)
    
    * druid task auto scale based on kafka lag
    
    * fix kafkaSupervisorIOConfig and KinesisSupervisorIOConfig
    
    * druid task auto scale based on kafka lag
    
    * fix kafkaSupervisorIOConfig and KinesisSupervisorIOConfig
    
    * test dynamic auto scale done
    
    * auto scale tasks tested on prd cluster
    
    * auto scale tasks tested on prd cluster
    
    * modify code style to solve 29055.10 29055.9 29055.17 29055.18 29055.19 29055.20
    
    * rename test fiel function
    
    * change codes and add docs based on capistrant reviewed
    
    * midify test docs
    
    * modify docs
    
    * modify docs
    
    * modify docs
    
    * merge from master
    
    * Extract the autoScale logic out of SeekableStreamSupervisor to minimize putting more stuff inside there &&  Make autoscaling algorithm configurable and scalable.
    
    * fix ci failed
    
    * revert msic.xml
    
    * add uts to test autoscaler create && scale out/in and kafka ingest with scale enable
    
    * add more uts
    
    * fix inner class check
    
    * add IT for kafka ingestion with autoscaler
    
    * add new IT in groups=kafka-index named testKafkaIndexDataWithWithAutoscaler
    
    * review change
    
    * code review
    
    * remove unused imports
    
    * fix NLP
    
    * fix docs and UTs
    
    * revert misc.xml
    
    * use jackson to build autoScaleConfig with default values
    
    * add uts
    
    * use jackson to init AutoScalerConfig in IOConfig instead of Map<>
    
    * autoscalerConfig interface and provide a defaultAutoScalerConfig
    
    * modify uts
    
    * modify docs
    
    * fix checkstyle
    
    * revert misc.xml
    
    * modify uts
    
    * reviewed code change
    
    * reviewed code change
    
    * code reviewed
    
    * code review
    
    * log changed
    
    * do StringUtils.encodeForFormat when create allocationExec
    
    * code review && limit taskCountMax to partitionNumbers
    
    * modify docs
    
    * code review
    
    Co-authored-by: yuezhang <yu...@freewheel.tv>
---
 .../development/extensions-core/kafka-ingestion.md | 109 +++
 .../MaterializedViewSupervisor.java                |  13 +
 .../MaterializedViewSupervisorSpecTest.java        |  82 ++
 .../indexing/kafka/supervisor/KafkaSupervisor.java |  13 +
 .../kafka/supervisor/KafkaSupervisorIOConfig.java  |   5 +
 .../druid/indexing/kafka/KafkaSamplerSpecTest.java |   1 +
 .../kafka/supervisor/KafkaSupervisorTest.java      | 199 +++++
 .../kinesis/supervisor/KinesisSupervisor.java      |   8 +
 .../supervisor/KinesisSupervisorIOConfig.java      |  12 +
 .../indexing/kinesis/KinesisSamplerSpecTest.java   |   1 +
 .../kinesis/supervisor/KinesisSupervisorTest.java  |  73 ++
 indexing-service/pom.xml                           |   6 +-
 .../overlord/supervisor/SupervisorManager.java     |  31 +
 .../supervisor/SeekableStreamSupervisor.java       | 210 ++++-
 .../SeekableStreamSupervisorIOConfig.java          |  26 +-
 .../supervisor/SeekableStreamSupervisorSpec.java   |  18 +
 .../supervisor/autoscaler/AutoScalerConfig.java    |  43 +
 .../supervisor/autoscaler/LagBasedAutoScaler.java  | 244 ++++++
 .../autoscaler/LagBasedAutoScalerConfig.java       | 208 +++++
 .../supervisor/autoscaler/NoopTaskAutoScaler.java  |  47 +
 .../OverlordSecurityResourceFilterTest.java        |   8 +
 .../supervisor/SupervisorResourceTest.java         |   7 +
 .../SeekableStreamSupervisorSpecTest.java          | 950 +++++++++++++++++++++
 .../SeekableStreamSupervisorStateTest.java         |  39 +-
 .../tests/indexer/AbstractStreamIndexingTest.java  |  68 ++
 ...ingServiceNonTransactionalParallelizedTest.java |  10 +
 .../supervisor_with_autoscaler_spec_template.json  |  73 ++
 pom.xml                                            |   5 +
 .../overlord/supervisor/NoopSupervisorSpec.java    |  13 +
 .../indexing/overlord/supervisor/Supervisor.java   |   9 +
 .../overlord/supervisor/SupervisorSpec.java        |   6 +
 .../overlord/supervisor/autoscaler/LagStats.java   |  49 ++
 .../autoscaler/SupervisorTaskAutoScaler.java       |  27 +
 .../druid/indexing/NoopSupervisorSpecTest.java     |  67 ++
 .../metadata/SQLMetadataSupervisorManagerTest.java |   7 +
 .../apache/druid/metadata/TestSupervisorSpec.java  |   7 +
 36 files changed, 2671 insertions(+), 23 deletions(-)

diff --git a/docs/development/extensions-core/kafka-ingestion.md b/docs/development/extensions-core/kafka-ingestion.md
index 7b94a99..0dd3167 100644
--- a/docs/development/extensions-core/kafka-ingestion.md
+++ b/docs/development/extensions-core/kafka-ingestion.md
@@ -146,6 +146,115 @@ A sample supervisor spec is shown below:
 |`lateMessageRejectionStartDateTime`|ISO8601 DateTime|Configure tasks to reject messages with timestamps earlier than this date time; for example if this is set to `2016-01-01T11:00Z` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* will be dropped. This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime and a [...]
 |`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps earlier than this period before the task was created; for example if this is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* will be dropped. This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime an [...]
 |`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps later than this period after the task reached its taskDuration; for example if this is set to `PT1H`, the taskDuration is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks sometimes run past their task duration, for example, in cases of supervisor failover. Setting earlyMessageReject [...]
+|`autoScalerConfig`|Object|`autoScalerConfig` to specify how to auto scale the number of Kafka ingest tasks. ONLY supported for Kafka indexing as of now. See [Tasks Autoscaler Properties](#Task Autoscaler Properties) for details.|no (default == null)|
+
+### Task Autoscaler Properties
+
+> Note that Task AutoScaler is currently designated as experimental.
+
+| Property | Description | Required |
+| ------------- | ------------- | ------------- |
+| `enableTaskAutoScaler` | Whether enable this feature or not. Set false or ignored here will disable `autoScaler` even though `autoScalerConfig` is not null| no (default == false) |
+| `taskCountMax` | Maximum value of task count. Make Sure `taskCountMax >= taskCountMin`. If `taskCountMax > {numKafkaPartitions}`, the maximum number of reading tasks would be equal to `{numKafkaPartitions}` and `taskCountMax` would be ignored.  | yes |
+| `taskCountMin` | Minimum value of task count. When enable autoscaler, the value of taskCount in `IOConfig` will be ignored, and `taskCountMin` will be the number of tasks that ingestion starts going up to `taskCountMax`| yes |
+| `minTriggerScaleActionFrequencyMillis` | Minimum time interval between two scale actions | no (default == 600000) |
+| `autoScalerStrategy` | The algorithm of `autoScaler`. ONLY `lagBased` is supported for now. See [Lag Based AutoScaler Strategy Related Properties](#Lag Based AutoScaler Strategy Related Properties) for details.| no (default == `lagBased`) |
+
+### Lag Based AutoScaler Strategy Related Properties
+| Property | Description | Required |
+| ------------- | ------------- | ------------- |
+| `lagCollectionIntervalMillis` | Period of lag points collection.  | no (default == 30000) |
+| `lagCollectionRangeMillis` | The total time window of lag collection, Use with `lagCollectionIntervalMillis`,it means that in the recent `lagCollectionRangeMillis`, collect lag metric points every `lagCollectionIntervalMillis`. | no (default == 600000) |
+| `scaleOutThreshold` | The Threshold of scale out action | no (default == 6000000) |
+| `triggerScaleOutFractionThreshold` | If `triggerScaleOutFractionThreshold` percent of lag points are higher than `scaleOutThreshold`, then do scale out action. | no (default == 0.3) |
+| `scaleInThreshold` | The Threshold of scale in action | no (default == 1000000) |
+| `triggerScaleInFractionThreshold` | If `triggerScaleInFractionThreshold` percent of lag points are lower than `scaleOutThreshold`, then do scale in action. | no (default == 0.9) |
+| `scaleActionStartDelayMillis` | Number of milliseconds after supervisor starts when first check scale logic. | no (default == 300000) |
+| `scaleActionPeriodMillis` | The frequency of checking whether to do scale action in millis | no (default == 60000) |
+| `scaleInStep` | How many tasks to reduce at a time | no (default == 1) |
+| `scaleOutStep` | How many tasks to add at a time | no (default == 2) |
+
+A sample supervisor spec with `lagBased` autoScaler enabled is shown below:
+```json
+{
+    "type": "kafka",
+    "dataSchema": {
+        "dataSource": "metrics-kafka",
+        "timestampSpec": {
+            "column": "timestamp",
+            "format": "auto"
+        },
+        "dimensionsSpec": {
+            "dimensions": [
+
+            ],
+            "dimensionExclusions": [
+                "timestamp",
+                "value"
+            ]
+        },
+        "metricsSpec": [
+            {
+                "name": "count",
+                "type": "count"
+            },
+            {
+                "name": "value_sum",
+                "fieldName": "value",
+                "type": "doubleSum"
+            },
+            {
+                "name": "value_min",
+                "fieldName": "value",
+                "type": "doubleMin"
+            },
+            {
+                "name": "value_max",
+                "fieldName": "value",
+                "type": "doubleMax"
+            }
+        ],
+        "granularitySpec": {
+            "type": "uniform",
+            "segmentGranularity": "HOUR",
+            "queryGranularity": "NONE"
+        }
+    },
+    "ioConfig": {
+        "topic": "metrics",
+        "inputFormat": {
+            "type": "json"
+        },
+        "consumerProperties": {
+            "bootstrap.servers": "localhost:9092"
+        },
+        "autoScalerConfig": {
+            "enableTaskAutoScaler": true,
+            "taskCountMax": 6,
+            "taskCountMin": 2,
+            "minTriggerScaleActionFrequencyMillis": 600000,
+            "autoScalerStrategy": "lagBased",
+            "lagCollectionIntervalMillis": 30000,
+            "lagCollectionRangeMillis": 600000,
+            "scaleOutThreshold": 6000000,
+            "triggerScaleOutFractionThreshold": 0.3,
+            "scaleInThreshold": 1000000,
+            "triggerScaleInFractionThreshold": 0.9,
+            "scaleActionStartDelayMillis": 300000,
+            "scaleActionPeriodMillis": 60000,
+            "scaleInStep": 1,
+            "scaleOutStep": 2
+        },
+        "taskCount":1,
+        "replicas":1,
+        "taskDuration":"PT1H"
+    },
+    "tuningConfig":{
+        "type":"kafka",
+        "maxRowsPerSegment":5000000
+    }
+}
+```
 
 #### More on consumerProperties
 
diff --git a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
index 41647d5..f456cb6 100644
--- a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
+++ b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
@@ -37,6 +37,7 @@ import org.apache.druid.indexing.overlord.TaskStorage;
 import org.apache.druid.indexing.overlord.supervisor.Supervisor;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorReport;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.JodaUtils;
@@ -282,6 +283,18 @@ public class MaterializedViewSupervisor implements Supervisor
     // do nothing
   }
 
+  @Override
+  public LagStats computeLagStats()
+  {
+    throw new UnsupportedOperationException("Compute Lag Stats not supported in MaterializedViewSupervisor");
+  }
+
+  @Override
+  public int getActiveTaskGroupsCount()
+  {
+    throw new UnsupportedOperationException("Get Active Task Groups Count is not supported in MaterializedViewSupervisor");
+  }
+
   /**
    * Find intervals in which derived dataSource should rebuild the segments.
    * Choose the latest intervals to create new HadoopIndexTask and submit it.
diff --git a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java
index c82b8b8..01ebdd0 100644
--- a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java
+++ b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java
@@ -29,7 +29,9 @@ import org.apache.druid.indexer.HadoopTuningConfig;
 import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
 import org.apache.druid.indexing.overlord.TaskMaster;
 import org.apache.druid.indexing.overlord.TaskStorage;
+import org.apache.druid.indexing.overlord.supervisor.Supervisor;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
 import org.apache.druid.math.expr.ExprMacroTable;
 import org.apache.druid.metadata.MetadataSupervisorManager;
 import org.apache.druid.metadata.SqlSegmentsMetadataManager;
@@ -50,6 +52,7 @@ import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
 import java.io.IOException;
+import java.util.concurrent.Callable;
 
 public class MaterializedViewSupervisorSpecTest
 {
@@ -156,6 +159,85 @@ public class MaterializedViewSupervisorSpecTest
   }
 
   @Test
+  public void testMaterializedViewSupervisorSpecCreated()
+  {
+    Exception ex = null;
+
+    try {
+      MaterializedViewSupervisorSpec spec = new MaterializedViewSupervisorSpec(
+              "wikiticker",
+              new DimensionsSpec(
+                      Lists.newArrayList(
+                              new StringDimensionSchema("isUnpatrolled"),
+                              new StringDimensionSchema("metroCode"),
+                              new StringDimensionSchema("namespace"),
+                              new StringDimensionSchema("page"),
+                              new StringDimensionSchema("regionIsoCode"),
+                              new StringDimensionSchema("regionName"),
+                              new StringDimensionSchema("user")
+                      ),
+                      null,
+                      null
+              ),
+              new AggregatorFactory[]{
+                  new CountAggregatorFactory("count"),
+                  new LongSumAggregatorFactory("added", "added")
+              },
+              HadoopTuningConfig.makeDefaultTuningConfig(),
+              null,
+              null,
+              null,
+              null,
+              null,
+              false,
+              objectMapper,
+              null,
+              null,
+              null,
+              null,
+              null,
+              new MaterializedViewTaskConfig(),
+              EasyMock.createMock(AuthorizerMapper.class),
+              new NoopChatHandlerProvider(),
+              new SupervisorStateManagerConfig()
+      );
+      Supervisor supervisor = spec.createSupervisor();
+      Assert.assertTrue(supervisor instanceof MaterializedViewSupervisor);
+
+      SupervisorTaskAutoScaler autoscaler = spec.createAutoscaler(supervisor);
+      Assert.assertNull(autoscaler);
+
+      try {
+        supervisor.computeLagStats();
+      }
+      catch (Exception e) {
+        Assert.assertTrue(e instanceof UnsupportedOperationException);
+      }
+
+      try {
+        int count = supervisor.getActiveTaskGroupsCount();
+      }
+      catch (Exception e) {
+        Assert.assertTrue(e instanceof UnsupportedOperationException);
+      }
+
+      Callable<Integer> noop = new Callable<Integer>() {
+        @Override
+        public Integer call()
+        {
+          return -1;
+        }
+      };
+
+    }
+    catch (Exception e) {
+      ex = e;
+    }
+
+    Assert.assertNull(ex);
+  }
+
+  @Test
   public void testSuspendResuume() throws IOException
   {
     String supervisorStr = "{\n" +
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index d592f78..a02568a 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -38,6 +38,7 @@ import org.apache.druid.indexing.overlord.DataSourceMetadata;
 import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
 import org.apache.druid.indexing.overlord.TaskMaster;
 import org.apache.druid.indexing.overlord.TaskStorage;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
 import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
 import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
 import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig;
@@ -58,6 +59,7 @@ import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig;
 import org.joda.time.DateTime;
 
 import javax.annotation.Nullable;
+
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -331,6 +333,17 @@ public class KafkaSupervisor extends SeekableStreamSupervisor<Integer, Long, Kaf
   }
 
   @Override
+  public LagStats computeLagStats()
+  {
+    Map<Integer, Long> partitionRecordLag = getPartitionRecordLag();
+    if (partitionRecordLag == null) {
+      return new LagStats(0, 0, 0);
+    }
+
+    return computeLags(partitionRecordLag);
+  }
+
+  @Override
   protected void updatePartitionLagFromStream()
   {
     getRecordSupplierLock().lock();
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java
index 62c1e79..87b689e 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java
@@ -24,10 +24,12 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
 import org.apache.druid.data.input.InputFormat;
 import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
+import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
 import org.apache.druid.java.util.common.StringUtils;
 import org.joda.time.DateTime;
 import org.joda.time.Period;
 
+import javax.annotation.Nullable;
 import java.util.Map;
 
 public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
@@ -51,6 +53,7 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
       @JsonProperty("taskCount") Integer taskCount,
       @JsonProperty("taskDuration") Period taskDuration,
       @JsonProperty("consumerProperties") Map<String, Object> consumerProperties,
+      @Nullable @JsonProperty("autoScalerConfig") AutoScalerConfig autoScalerConfig,
       @JsonProperty("pollTimeout") Long pollTimeout,
       @JsonProperty("startDelay") Period startDelay,
       @JsonProperty("period") Period period,
@@ -73,6 +76,7 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
         completionTimeout,
         lateMessageRejectionPeriod,
         earlyMessageRejectionPeriod,
+        autoScalerConfig,
         lateMessageRejectionStartDateTime
     );
 
@@ -117,6 +121,7 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
            ", taskCount=" + getTaskCount() +
            ", taskDuration=" + getTaskDuration() +
            ", consumerProperties=" + consumerProperties +
+           ", autoScalerConfig=" + getAutoscalerConfig() +
            ", pollTimeout=" + pollTimeout +
            ", startDelay=" + getStartDelay() +
            ", period=" + getPeriod() +
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
index c54a31f..dd712d9 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
@@ -134,6 +134,7 @@ public class KafkaSamplerSpecTest extends InitializedNullHandlingTest
             null,
             null,
             null,
+            null,
             true,
             null,
             null,
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index 1dbabe8..ac6b6e5 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -61,13 +61,16 @@ import org.apache.druid.indexing.overlord.TaskStorage;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorReport;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
 import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
 import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.Status;
 import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
 import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
 import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
+import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
 import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManager;
 import org.apache.druid.indexing.seekablestream.supervisor.TaskReportData;
+import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.LagBasedAutoScalerConfig;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
@@ -158,6 +161,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
   private RowIngestionMetersFactory rowIngestionMetersFactory;
   private ExceptionCapturingServiceEmitter serviceEmitter;
   private SupervisorStateManagerConfig supervisorConfig;
+  private KafkaSupervisorIngestionSpec ingestionSchema;
 
   private static String getTopic()
   {
@@ -214,6 +218,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
     serviceEmitter = new ExceptionCapturingServiceEmitter();
     EmittingLogger.registerEmitter(serviceEmitter);
     supervisorConfig = new SupervisorStateManagerConfig();
+    ingestionSchema = EasyMock.createMock(KafkaSupervisorIngestionSpec.class);
   }
 
   @After
@@ -233,6 +238,197 @@ public class KafkaSupervisorTest extends EasyMockSupport
   }
 
   @Test
+  public void testNoInitialStateWithAutoscaler() throws Exception
+  {
+    KafkaIndexTaskClientFactory taskClientFactory = new KafkaIndexTaskClientFactory(
+            null,
+            null
+    )
+    {
+      @Override
+      public KafkaIndexTaskClient build(
+              TaskInfoProvider taskInfoProvider,
+              String dataSource,
+              int numThreads,
+              Duration httpTimeout,
+              long numRetries
+      )
+      {
+        Assert.assertEquals(TEST_CHAT_THREADS, numThreads);
+        Assert.assertEquals(TEST_HTTP_TIMEOUT.toStandardDuration(), httpTimeout);
+        Assert.assertEquals(TEST_CHAT_RETRIES, numRetries);
+        return taskClient;
+      }
+    };
+
+    HashMap<String, Object> autoScalerConfig = new HashMap<>();
+    autoScalerConfig.put("enableTaskAutoScaler", true);
+    autoScalerConfig.put("lagCollectionIntervalMillis", 500);
+    autoScalerConfig.put("lagCollectionRangeMillis", 500);
+    autoScalerConfig.put("scaleOutThreshold", 0);
+    autoScalerConfig.put("triggerScaleOutFractionThreshold", 0.0);
+    autoScalerConfig.put("scaleInThreshold", 1000000);
+    autoScalerConfig.put("triggerScaleInFractionThreshold", 0.8);
+    autoScalerConfig.put("scaleActionStartDelayMillis", 0);
+    autoScalerConfig.put("scaleActionPeriodMillis", 100);
+    autoScalerConfig.put("taskCountMax", 2);
+    autoScalerConfig.put("taskCountMin", 1);
+    autoScalerConfig.put("scaleInStep", 1);
+    autoScalerConfig.put("scaleOutStep", 2);
+    autoScalerConfig.put("minTriggerScaleActionFrequencyMillis", 1200000);
+
+    final Map<String, Object> consumerProperties = KafkaConsumerConfigs.getConsumerProperties();
+    consumerProperties.put("myCustomKey", "myCustomValue");
+    consumerProperties.put("bootstrap.servers", kafkaHost);
+
+    KafkaSupervisorIOConfig kafkaSupervisorIOConfig = new KafkaSupervisorIOConfig(
+            topic,
+            INPUT_FORMAT,
+            1,
+            1,
+            new Period("PT1H"),
+            consumerProperties,
+            OBJECT_MAPPER.convertValue(autoScalerConfig, LagBasedAutoScalerConfig.class),
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
+            new Period("P1D"),
+            new Period("PT30S"),
+            true,
+            new Period("PT30M"),
+            null,
+            null,
+            null
+    );
+
+    final KafkaSupervisorTuningConfig tuningConfigOri = new KafkaSupervisorTuningConfig(
+            null,
+            1000,
+            null,
+            null,
+            50000,
+            null,
+            new Period("P1Y"),
+            new File("/test"),
+            null,
+            null,
+            null,
+            true,
+            false,
+            null,
+            false,
+            null,
+            numThreads,
+            TEST_CHAT_THREADS,
+            TEST_CHAT_RETRIES,
+            TEST_HTTP_TIMEOUT,
+            TEST_SHUTDOWN_TIMEOUT,
+            null,
+            null,
+            null,
+            null,
+            null
+    );
+
+    EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(kafkaSupervisorIOConfig).anyTimes();
+    EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
+    EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(tuningConfigOri).anyTimes();
+    EasyMock.replay(ingestionSchema);
+
+    SeekableStreamSupervisorSpec testableSupervisorSpec = new KafkaSupervisorSpec(
+            ingestionSchema,
+            dataSchema,
+            tuningConfigOri,
+            kafkaSupervisorIOConfig,
+            null,
+            false,
+            taskStorage,
+            taskMaster,
+            indexerMetadataStorageCoordinator,
+            taskClientFactory,
+            OBJECT_MAPPER,
+            new NoopServiceEmitter(),
+            new DruidMonitorSchedulerConfig(),
+            rowIngestionMetersFactory,
+            new SupervisorStateManagerConfig()
+    );
+
+    supervisor = new TestableKafkaSupervisor(
+            taskStorage,
+            taskMaster,
+            indexerMetadataStorageCoordinator,
+            taskClientFactory,
+            OBJECT_MAPPER,
+            (KafkaSupervisorSpec) testableSupervisorSpec,
+            rowIngestionMetersFactory
+    );
+
+    SupervisorTaskAutoScaler autoscaler = testableSupervisorSpec.createAutoscaler(supervisor);
+
+
+    final KafkaSupervisorTuningConfig tuningConfig = supervisor.getTuningConfig();
+    addSomeEvents(1);
+
+    Capture<KafkaIndexTask> captured = Capture.newInstance();
+    EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
+    EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
+    EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()).anyTimes();
+    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
+    EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
+            new KafkaDataSourceMetadata(
+                    null
+            )
+    ).anyTimes();
+    EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true);
+    taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class));
+    replayAll();
+
+    supervisor.start();
+    int taskCountBeforeScale = supervisor.getIoConfig().getTaskCount();
+    Assert.assertEquals(1, taskCountBeforeScale);
+    autoscaler.start();
+    supervisor.runInternal();
+    Thread.sleep(1 * 1000);
+    verifyAll();
+
+    int taskCountAfterScale = supervisor.getIoConfig().getTaskCount();
+    Assert.assertEquals(2, taskCountAfterScale);
+
+
+    KafkaIndexTask task = captured.getValue();
+    Assert.assertEquals(KafkaSupervisorTest.dataSchema, task.getDataSchema());
+    Assert.assertEquals(tuningConfig.convertToTaskTuningConfig(), task.getTuningConfig());
+
+    KafkaIndexTaskIOConfig taskConfig = task.getIOConfig();
+    Assert.assertEquals(kafkaHost, taskConfig.getConsumerProperties().get("bootstrap.servers"));
+    Assert.assertEquals("myCustomValue", taskConfig.getConsumerProperties().get("myCustomKey"));
+    Assert.assertEquals("sequenceName-0", taskConfig.getBaseSequenceName());
+    Assert.assertTrue("isUseTransaction", taskConfig.isUseTransaction());
+    Assert.assertFalse("minimumMessageTime", taskConfig.getMinimumMessageTime().isPresent());
+    Assert.assertFalse("maximumMessageTime", taskConfig.getMaximumMessageTime().isPresent());
+
+    Assert.assertEquals(topic, taskConfig.getStartSequenceNumbers().getStream());
+    Assert.assertEquals(0L, (long) taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(0));
+    Assert.assertEquals(0L, (long) taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(1));
+    Assert.assertEquals(0L, (long) taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(2));
+
+    Assert.assertEquals(topic, taskConfig.getEndSequenceNumbers().getStream());
+    Assert.assertEquals(
+            Long.MAX_VALUE,
+            (long) taskConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap().get(0)
+    );
+    Assert.assertEquals(
+            Long.MAX_VALUE,
+            (long) taskConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap().get(1)
+    );
+    Assert.assertEquals(
+            Long.MAX_VALUE,
+            (long) taskConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap().get(2)
+    );
+
+    autoscaler.reset();
+    autoscaler.stop();
+  }
+
+  @Test
   public void testCreateBaseTaskContexts() throws JsonProcessingException
   {
     supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null);
@@ -3379,6 +3575,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
         taskCount,
         new Period(duration),
         consumerProperties,
+        null,
         KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
         new Period("P1D"),
         new Period("PT30S"),
@@ -3491,6 +3688,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
         taskCount,
         new Period(duration),
         consumerProperties,
+        null,
         KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
         new Period("P1D"),
         new Period("PT30S"),
@@ -3607,6 +3805,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
         taskCount,
         new Period(duration),
         consumerProperties,
+        null,
         KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
         new Period("P1D"),
         new Period("PT30S"),
diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
index a7bc599..92defd2 100644
--- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
+++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
@@ -39,6 +39,7 @@ import org.apache.druid.indexing.overlord.DataSourceMetadata;
 import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
 import org.apache.druid.indexing.overlord.TaskMaster;
 import org.apache.druid.indexing.overlord.TaskStorage;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
 import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
 import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
 import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
@@ -378,6 +379,13 @@ public class KinesisSupervisor extends SeekableStreamSupervisor<String, String,
     return true;
   }
 
+  // not yet supported, will be implemented in the future maybe? need to find a proper way to measure kinesis lag.
+  @Override
+  public LagStats computeLagStats()
+  {
+    throw new UnsupportedOperationException("Compute Lag Stats is not supported in KinesisSupervisor yet.");
+  }
+
   @Override
   protected Map<String, OrderedSequenceNumber<String>> filterExpiredPartitionsFromStartingOffsets(
       Map<String, OrderedSequenceNumber<String>> startingOffsets
diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java
index f68e0b7..b43cece 100644
--- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java
+++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java
@@ -26,9 +26,12 @@ import org.apache.druid.data.input.InputFormat;
 import org.apache.druid.indexing.kinesis.KinesisIndexTaskIOConfig;
 import org.apache.druid.indexing.kinesis.KinesisRegion;
 import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
+import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
 import org.joda.time.DateTime;
 import org.joda.time.Period;
 
+import javax.annotation.Nullable;
+
 public class KinesisSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
 {
   private final String endpoint;
@@ -70,6 +73,7 @@ public class KinesisSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
       @JsonProperty("fetchDelayMillis") Integer fetchDelayMillis,
       @JsonProperty("awsAssumedRoleArn") String awsAssumedRoleArn,
       @JsonProperty("awsExternalId") String awsExternalId,
+      @Nullable @JsonProperty("autoScalerConfig") AutoScalerConfig autoScalerConfig,
       @JsonProperty("deaggregate") boolean deaggregate
   )
   {
@@ -85,8 +89,16 @@ public class KinesisSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
         completionTimeout,
         lateMessageRejectionPeriod,
         earlyMessageRejectionPeriod,
+        autoScalerConfig,
         lateMessageRejectionStartDateTime
     );
+
+    // for now dynamic Allocation Tasks is not supported here
+    // throw UnsupportedOperationException in case someone sets this on a kinesis supervisor spec.
+    if (autoScalerConfig != null) {
+      throw new UnsupportedOperationException("Tasks auto scaler for kinesis is not supported yet. Please remove autoScalerConfig or set it to null!");
+    }
+
     this.endpoint = endpoint != null
                     ? endpoint
                     : (region != null ? region.getEndpoint() : KinesisRegion.US_EAST_1.getEndpoint());
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java
index 0b9ebdb..e17ce1e 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java
@@ -153,6 +153,7 @@ public class KinesisSamplerSpecTest extends EasyMockSupport
             null,
             null,
             null,
+            null,
             false
         ),
         null,
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
index 3f35204..5d1d0f6 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
@@ -66,6 +66,7 @@ import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
 import org.apache.druid.indexing.seekablestream.common.StreamPartition;
 import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManager;
 import org.apache.druid.indexing.seekablestream.supervisor.TaskReportData;
+import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
@@ -102,6 +103,7 @@ import java.io.File;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -304,6 +306,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
         1000,
         null,
         null,
+        null,
         false
     );
     KinesisIndexTaskClientFactory clientFactory = new KinesisIndexTaskClientFactory(null, OBJECT_MAPPER);
@@ -345,6 +348,72 @@ public class KinesisSupervisorTest extends EasyMockSupport
   }
 
   @Test
+  public void testKinesisIOConfig()
+  {
+    Exception e = null;
+    try {
+      KinesisSupervisorIOConfig kinesisSupervisorIOConfig = new KinesisSupervisorIOConfig(
+              STREAM,
+              INPUT_FORMAT,
+              "awsEndpoint",
+              null,
+              1,
+              1,
+              new Period("PT30M"),
+              new Period("P1D"),
+              new Period("PT30S"),
+              false,
+              new Period("PT30M"),
+              null,
+              null,
+              null,
+              100,
+              1000,
+              null,
+              null,
+              null,
+              false
+      );
+      AutoScalerConfig autoScalerConfig = kinesisSupervisorIOConfig.getAutoscalerConfig();
+      Assert.assertNull(autoScalerConfig);
+    }
+    catch (Exception ex) {
+      e = ex;
+    }
+    Assert.assertNull(e);
+
+    try {
+      KinesisSupervisorIOConfig kinesisSupervisorIOConfig = new KinesisSupervisorIOConfig(
+              STREAM,
+              INPUT_FORMAT,
+              "awsEndpoint",
+              null,
+              1,
+              1,
+              new Period("PT30M"),
+              new Period("P1D"),
+              new Period("PT30S"),
+              false,
+              new Period("PT30M"),
+              null,
+              null,
+              null,
+              100,
+              1000,
+              null,
+              null,
+              OBJECT_MAPPER.convertValue(new HashMap<>(), AutoScalerConfig.class),
+              false
+      );
+    }
+    catch (Exception ex) {
+      e = ex;
+    }
+    Assert.assertNotNull(e);
+    Assert.assertTrue(e instanceof UnsupportedOperationException);
+  }
+
+  @Test
   public void testMultiTask() throws Exception
   {
     supervisor = getTestableSupervisor(1, 2, true, "PT1H", null, null);
@@ -4721,6 +4790,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
         null,
         null,
         null,
+        null,
         false
     );
 
@@ -4863,6 +4933,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
         fetchDelayMillis,
         null,
         null,
+        null,
         false
     );
 
@@ -4950,6 +5021,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
         fetchDelayMillis,
         null,
         null,
+        null,
         false
     );
 
@@ -5039,6 +5111,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
         fetchDelayMillis,
         null,
         null,
+        null,
         false
     );
 
diff --git a/indexing-service/pom.xml b/indexing-service/pom.xml
index 5564122..be8360b 100644
--- a/indexing-service/pom.xml
+++ b/indexing-service/pom.xml
@@ -62,7 +62,11 @@
             <artifactId>druid-hll</artifactId>
             <version>${project.parent.version}</version>
         </dependency>
-
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-collections4</artifactId>
+            <version>4.2</version>
+        </dependency>
         <dependency>
             <groupId>io.dropwizard.metrics</groupId>
             <artifactId>metrics-core</artifactId>
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
index 48153b0..b638fcf 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
@@ -23,6 +23,7 @@ import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.inject.Inject;
 import org.apache.druid.indexing.overlord.DataSourceMetadata;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
 import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
@@ -30,6 +31,7 @@ import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.metadata.MetadataSupervisorManager;
 
 import javax.annotation.Nullable;
+
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -44,6 +46,8 @@ public class SupervisorManager
 
   private final MetadataSupervisorManager metadataSupervisorManager;
   private final ConcurrentHashMap<String, Pair<Supervisor, SupervisorSpec>> supervisors = new ConcurrentHashMap<>();
+  // SupervisorTaskAutoScaler could be null
+  private final ConcurrentHashMap<String, SupervisorTaskAutoScaler> autoscalers = new ConcurrentHashMap<>();
   private final Object lock = new Object();
 
   private volatile boolean started = false;
@@ -54,6 +58,11 @@ public class SupervisorManager
     this.metadataSupervisorManager = metadataSupervisorManager;
   }
 
+  public MetadataSupervisorManager getMetadataSupervisorManager()
+  {
+    return metadataSupervisorManager;
+  }
+
   public Set<String> getSupervisorIds()
   {
     return supervisors.keySet();
@@ -140,12 +149,17 @@ public class SupervisorManager
       for (String id : supervisors.keySet()) {
         try {
           supervisors.get(id).lhs.stop(false);
+          SupervisorTaskAutoScaler autoscaler = autoscalers.get(id);
+          if (autoscaler != null) {
+            autoscaler.stop();
+          }
         }
         catch (Exception e) {
           log.warn(e, "Caught exception while stopping supervisor [%s]", id);
         }
       }
       supervisors.clear();
+      autoscalers.clear();
       started = false;
     }
 
@@ -187,6 +201,10 @@ public class SupervisorManager
     }
 
     supervisor.lhs.reset(dataSourceMetadata);
+    SupervisorTaskAutoScaler autoscaler = autoscalers.get(id);
+    if (autoscaler != null) {
+      autoscaler.reset();
+    }
     return true;
   }
 
@@ -238,6 +256,12 @@ public class SupervisorManager
     pair.lhs.stop(true);
     supervisors.remove(id);
 
+    SupervisorTaskAutoScaler autoscler = autoscalers.get(id);
+    if (autoscler != null) {
+      autoscler.stop();
+      autoscalers.remove(id);
+    }
+
     return true;
   }
 
@@ -282,9 +306,16 @@ public class SupervisorManager
     }
 
     Supervisor supervisor;
+    SupervisorTaskAutoScaler autoscaler;
     try {
       supervisor = spec.createSupervisor();
+      autoscaler = spec.createAutoscaler(supervisor);
+
       supervisor.start();
+      if (autoscaler != null) {
+        autoscaler.start();
+        autoscalers.put(id, autoscaler);
+      }
     }
     catch (Exception e) {
       // Supervisor creation or start failed write tombstone only when trying to start a new supervisor
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index e63f538..11339d4 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -56,8 +56,10 @@ import org.apache.druid.indexing.overlord.TaskRunnerListener;
 import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
 import org.apache.druid.indexing.overlord.TaskStorage;
 import org.apache.druid.indexing.overlord.supervisor.Supervisor;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorReport;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
 import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
 import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
 import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClient;
@@ -71,6 +73,7 @@ import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
 import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
 import org.apache.druid.indexing.seekablestream.common.StreamException;
 import org.apache.druid.indexing.seekablestream.common.StreamPartition;
+import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.ISE;
@@ -82,6 +85,7 @@ import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 import org.apache.druid.metadata.EntryExistsException;
+import org.apache.druid.metadata.MetadataSupervisorManager;
 import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.joda.time.DateTime;
@@ -102,6 +106,7 @@ import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutionException;
@@ -318,6 +323,127 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     }
   }
 
+  // change taskCount without resubmitting.
+  private class DynamicAllocationTasksNotice implements Notice
+  {
+    Callable<Integer> scaleAction;
+
+    DynamicAllocationTasksNotice(Callable<Integer> scaleAction)
+    {
+      this.scaleAction = scaleAction;
+    }
+
+    /**
+     * This method will do lag points collection and check dynamic scale action is necessary or not.
+     */
+    @Override
+    public void handle()
+    {
+      if (autoScalerConfig == null) {
+        log.warn("autoScalerConfig is null but dynamic allocation notice is submitted, how can it be ?");
+      } else {
+        try {
+          long nowTime = System.currentTimeMillis();
+          if (spec.isSuspended()) {
+            log.info("Skipping DynamicAllocationTasksNotice execution because [%s] supervisor is suspended",
+                    dataSource
+            );
+            return;
+          }
+          log.debug("PendingCompletionTaskGroups is [%s] for dataSource [%s]", pendingCompletionTaskGroups,
+                  dataSource
+          );
+          for (CopyOnWriteArrayList<TaskGroup> list : pendingCompletionTaskGroups.values()) {
+            if (!list.isEmpty()) {
+              log.info(
+                      "Skipping DynamicAllocationTasksNotice execution for datasource [%s] because following tasks are pending [%s]",
+                      dataSource, pendingCompletionTaskGroups
+              );
+              return;
+            }
+          }
+          if (nowTime - dynamicTriggerLastRunTime < autoScalerConfig.getMinTriggerScaleActionFrequencyMillis()) {
+            log.info(
+                    "DynamicAllocationTasksNotice submitted again in [%d] millis, minTriggerDynamicFrequency is [%s] for dataSource [%s], skipping it!",
+                    nowTime - dynamicTriggerLastRunTime, autoScalerConfig.getMinTriggerScaleActionFrequencyMillis(), dataSource
+            );
+            return;
+          }
+          final Integer desriedTaskCount = scaleAction.call();
+          boolean allocationSuccess = changeTaskCount(desriedTaskCount);
+          if (allocationSuccess) {
+            dynamicTriggerLastRunTime = nowTime;
+          }
+        }
+        catch (Exception ex) {
+          log.warn(ex, "Error parsing DynamicAllocationTasksNotice");
+        }
+      }
+    }
+  }
+
+  /**
+   * This method determines how to do scale actions based on collected lag points.
+   * If scale action is triggered :
+   *    First of all, call gracefulShutdownInternal() which will change the state of current datasource ingest tasks from reading to publishing.
+   *    Secondly, clear all the stateful data structures: activelyReadingTaskGroups, partitionGroups, partitionOffsets, pendingCompletionTaskGroups, partitionIds. These structures will be rebuiled in the next 'RunNotice'.
+   *    Finally, change the taskCount in SeekableStreamSupervisorIOConfig and sync it to MetadataStorage.
+   * After the taskCount is changed in SeekableStreamSupervisorIOConfig, next RunNotice will create scaled number of ingest tasks without resubmitting the supervisor.
+   * @param desiredActiveTaskCount desired taskCount computed from AutoScaler
+   * @return Boolean flag indicating if scale action was executed or not. If true, it will wait at least 'minTriggerScaleActionFrequencyMillis' before next 'changeTaskCount'.
+   *         If false, it will do 'changeTaskCount' again after 'scaleActionPeriodMillis' millis.
+   * @throws InterruptedException
+   * @throws ExecutionException
+   * @throws TimeoutException
+   */
+  private boolean changeTaskCount(int desiredActiveTaskCount) throws InterruptedException, ExecutionException, TimeoutException
+  {
+    int currentActiveTaskCount;
+    Collection<TaskGroup> activeTaskGroups = activelyReadingTaskGroups.values();
+    currentActiveTaskCount = activeTaskGroups.size();
+
+    if (desiredActiveTaskCount < 0 || desiredActiveTaskCount == currentActiveTaskCount) {
+      return false;
+    } else {
+      log.info(
+              "Starting scale action, current active task count is [%d] and desired task count is [%d] for dataSource [%s].",
+              currentActiveTaskCount, desiredActiveTaskCount, dataSource
+      );
+      gracefulShutdownInternal();
+      changeTaskCountInIOConfig(desiredActiveTaskCount);
+      clearAllocationInfo();
+      log.info("Changed taskCount to [%s] for dataSource [%s].", desiredActiveTaskCount, dataSource);
+      return true;
+    }
+  }
+
+  private void changeTaskCountInIOConfig(int desiredActiveTaskCount)
+  {
+    ioConfig.setTaskCount(desiredActiveTaskCount);
+    try {
+      Optional<SupervisorManager> supervisorManager = taskMaster.getSupervisorManager();
+      if (supervisorManager.isPresent()) {
+        MetadataSupervisorManager metadataSupervisorManager = supervisorManager.get().getMetadataSupervisorManager();
+        metadataSupervisorManager.insert(dataSource, spec);
+      } else {
+        log.error("supervisorManager is null in taskMaster, skipping scale action for dataSource [%s].", dataSource);
+      }
+    }
+    catch (Exception e) {
+      log.error(e, "Failed to sync taskCount to MetaStorage for dataSource [%s].", dataSource);
+    }
+  }
+
+  private void clearAllocationInfo()
+  {
+    activelyReadingTaskGroups.clear();
+    partitionGroups.clear();
+    partitionOffsets.clear();
+
+    pendingCompletionTaskGroups.clear();
+    partitionIds.clear();
+  }
+
   private class GracefulShutdownNotice extends ShutdownNotice
   {
     @Override
@@ -470,6 +596,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
   private final SeekableStreamIndexTaskClient<PartitionIdType, SequenceOffsetType> taskClient;
   private final SeekableStreamSupervisorSpec spec;
   private final SeekableStreamSupervisorIOConfig ioConfig;
+  private final AutoScalerConfig autoScalerConfig;
   private final SeekableStreamSupervisorTuningConfig tuningConfig;
   private final SeekableStreamIndexTaskTuningConfig taskTuningConfig;
   private final String supervisorId;
@@ -488,6 +615,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
   private final boolean useExclusiveStartingSequence;
   private boolean listenerRegistered = false;
   private long lastRunTime;
+  private long dynamicTriggerLastRunTime;
   private int initRetryCounter = 0;
   private volatile DateTime firstRunTime;
   private volatile DateTime earlyStopTime = null;
@@ -519,20 +647,40 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     this.useExclusiveStartingSequence = useExclusiveStartingSequence;
     this.dataSource = spec.getDataSchema().getDataSource();
     this.ioConfig = spec.getIoConfig();
+    this.autoScalerConfig = ioConfig.getAutoscalerConfig();
     this.tuningConfig = spec.getTuningConfig();
     this.taskTuningConfig = this.tuningConfig.convertToTaskTuningConfig();
     this.supervisorId = supervisorId;
     this.exec = Execs.singleThreaded(StringUtils.encodeForFormat(supervisorId));
     this.scheduledExec = Execs.scheduledSingleThreaded(StringUtils.encodeForFormat(supervisorId) + "-Scheduler-%d");
     this.reportingExec = Execs.scheduledSingleThreaded(StringUtils.encodeForFormat(supervisorId) + "-Reporting-%d");
+
     this.stateManager = new SeekableStreamSupervisorStateManager(
         spec.getSupervisorStateManagerConfig(),
         spec.isSuspended()
     );
 
-    int workerThreads = (this.tuningConfig.getWorkerThreads() != null
-                         ? this.tuningConfig.getWorkerThreads()
-                         : Math.min(10, this.ioConfig.getTaskCount()));
+    int workerThreads;
+    int chatThreads;
+    if (autoScalerConfig != null && autoScalerConfig.getEnableTaskAutoScaler()) {
+      log.info("Running Task autoscaler for datasource [%s]", dataSource);
+
+      workerThreads = (this.tuningConfig.getWorkerThreads() != null
+              ? this.tuningConfig.getWorkerThreads()
+              : Math.min(10, autoScalerConfig.getTaskCountMax()));
+
+      chatThreads = (this.tuningConfig.getChatThreads() != null
+              ? this.tuningConfig.getChatThreads()
+              : Math.min(10, autoScalerConfig.getTaskCountMax() * this.ioConfig.getReplicas()));
+    } else {
+      workerThreads = (this.tuningConfig.getWorkerThreads() != null
+              ? this.tuningConfig.getWorkerThreads()
+              : Math.min(10, this.ioConfig.getTaskCount()));
+
+      chatThreads = (this.tuningConfig.getChatThreads() != null
+              ? this.tuningConfig.getChatThreads()
+              : Math.min(10, this.ioConfig.getTaskCount() * this.ioConfig.getReplicas()));
+    }
 
     this.workerExec = MoreExecutors.listeningDecorator(
         Execs.multiThreaded(
@@ -578,9 +726,6 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
                                          + IndexTaskClient.MAX_RETRY_WAIT_SECONDS)
     );
 
-    int chatThreads = (this.tuningConfig.getChatThreads() != null
-                       ? this.tuningConfig.getChatThreads()
-                       : Math.min(10, this.ioConfig.getTaskCount() * this.ioConfig.getReplicas()));
     this.taskClient = taskClientFactory.build(
         taskInfoProvider,
         dataSource,
@@ -597,6 +742,13 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     );
   }
 
+
+  @Override
+  public int getActiveTaskGroupsCount()
+  {
+    return activelyReadingTaskGroups.values().size();
+  }
+
   @Override
   public void start()
   {
@@ -659,6 +811,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
         scheduledExec.shutdownNow(); // stop recurring executions
         reportingExec.shutdownNow();
 
+
         if (started) {
           Optional<TaskRunner> taskRunner = taskMaster.getTaskRunner();
           if (taskRunner.isPresent()) {
@@ -774,7 +927,6 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
         );
 
         scheduleReporting(reportingExec);
-
         started = true;
         log.info(
             "Started SeekableStreamSupervisor[%s], first run in [%s], with spec: [%s]",
@@ -797,6 +949,11 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     }
   }
 
+  public Runnable buildDynamicAllocationTask(Callable<Integer> scaleAction)
+  {
+    return () -> notices.add(new DynamicAllocationTasksNotice(scaleAction));
+  }
+
   private Runnable buildRunTask()
   {
     return () -> notices.add(new RunNotice());
@@ -1901,6 +2058,11 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     return false;
   }
 
+  public int getPartitionCount()
+  {
+    return recordSupplier.getPartitionIds(ioConfig.getStream()).size();
+  }
+
   private boolean updatePartitionDataFromStream()
   {
     List<PartitionIdType> previousPartitionIds = new ArrayList<>(partitionIds);
@@ -3510,29 +3672,21 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
           return;
         }
 
-        long maxLag = 0, totalLag = 0, avgLag;
-        for (long lag : partitionLags.values()) {
-          if (lag > maxLag) {
-            maxLag = lag;
-          }
-          totalLag += lag;
-        }
-        avgLag = partitionLags.size() == 0 ? 0 : totalLag / partitionLags.size();
-
+        LagStats lagStats = computeLags(partitionLags);
         emitter.emit(
             ServiceMetricEvent.builder()
                               .setDimension("dataSource", dataSource)
-                              .build(StringUtils.format("ingest/%s/lag%s", type, suffix), totalLag)
+                              .build(StringUtils.format("ingest/%s/lag%s", type, suffix), lagStats.getTotalLag())
         );
         emitter.emit(
             ServiceMetricEvent.builder()
                               .setDimension("dataSource", dataSource)
-                              .build(StringUtils.format("ingest/%s/maxLag%s", type, suffix), maxLag)
+                              .build(StringUtils.format("ingest/%s/maxLag%s", type, suffix), lagStats.getMaxLag())
         );
         emitter.emit(
             ServiceMetricEvent.builder()
                               .setDimension("dataSource", dataSource)
-                              .build(StringUtils.format("ingest/%s/avgLag%s", type, suffix), avgLag)
+                              .build(StringUtils.format("ingest/%s/avgLag%s", type, suffix), lagStats.getAvgLag())
         );
       };
 
@@ -3545,6 +3699,24 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     }
   }
 
+
+  /**
+   *  This method computes maxLag, totalLag and avgLag
+   * @param partitionLags lags per partition
+   */
+  protected LagStats computeLags(Map<PartitionIdType, Long> partitionLags)
+  {
+    long maxLag = 0, totalLag = 0, avgLag;
+    for (long lag : partitionLags.values()) {
+      if (lag > maxLag) {
+        maxLag = lag;
+      }
+      totalLag += lag;
+    }
+    avgLag = partitionLags.size() == 0 ? 0 : totalLag / partitionLags.size();
+    return new LagStats(maxLag, totalLag, avgLag);
+  }
+
   /**
    * a special sequence number that is used to indicate that the sequence offset
    * for a particular partition has not yet been calculated by the supervisor. When
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
index 723e22e..3ed55ec 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
 import org.apache.druid.java.util.common.IAE;
 import org.joda.time.DateTime;
 import org.joda.time.Duration;
@@ -37,7 +38,7 @@ public abstract class SeekableStreamSupervisorIOConfig
   @Nullable
   private final InputFormat inputFormat; // nullable for backward compatibility
   private final Integer replicas;
-  private final Integer taskCount;
+  private Integer taskCount;
   private final Duration taskDuration;
   private final Duration startDelay;
   private final Duration period;
@@ -46,6 +47,7 @@ public abstract class SeekableStreamSupervisorIOConfig
   private final Optional<Duration> lateMessageRejectionPeriod;
   private final Optional<Duration> earlyMessageRejectionPeriod;
   private final Optional<DateTime> lateMessageRejectionStartDateTime;
+  @Nullable private final AutoScalerConfig autoScalerConfig;
 
   public SeekableStreamSupervisorIOConfig(
       String stream,
@@ -59,13 +61,21 @@ public abstract class SeekableStreamSupervisorIOConfig
       Period completionTimeout,
       Period lateMessageRejectionPeriod,
       Period earlyMessageRejectionPeriod,
+      @Nullable AutoScalerConfig autoScalerConfig,
       DateTime lateMessageRejectionStartDateTime
   )
   {
     this.stream = Preconditions.checkNotNull(stream, "stream cannot be null");
     this.inputFormat = inputFormat;
     this.replicas = replicas != null ? replicas : 1;
-    this.taskCount = taskCount != null ? taskCount : 1;
+    // Could be null
+    this.autoScalerConfig = autoScalerConfig;
+    // if autoscaler is enable then taskcount will be ignored here. and init taskcount will be equal to taskCountMin
+    if (autoScalerConfig != null && autoScalerConfig.getEnableTaskAutoScaler()) {
+      this.taskCount = autoScalerConfig.getTaskCountMin();
+    } else {
+      this.taskCount = taskCount != null ? taskCount : 1;
+    }
     this.taskDuration = defaultDuration(taskDuration, "PT1H");
     this.startDelay = defaultDuration(startDelay, "PT5S");
     this.period = defaultDuration(period, "PT30S");
@@ -113,12 +123,24 @@ public abstract class SeekableStreamSupervisorIOConfig
     return replicas;
   }
 
+  @Nullable
+  @JsonProperty
+  public AutoScalerConfig getAutoscalerConfig()
+  {
+    return autoScalerConfig;
+  }
+
   @JsonProperty
   public Integer getTaskCount()
   {
     return taskCount;
   }
 
+  public void setTaskCount(final int taskCount)
+  {
+    this.taskCount = taskCount;
+  }
+
   @JsonProperty
   public Duration getTaskDuration()
   {
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
index 50be464..ff1d317 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
@@ -32,7 +32,10 @@ import org.apache.druid.indexing.overlord.TaskStorage;
 import org.apache.druid.indexing.overlord.supervisor.Supervisor;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
 import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClientFactory;
+import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
+import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.NoopTaskAutoScaler;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
 import org.apache.druid.segment.indexing.DataSchema;
@@ -151,6 +154,21 @@ public abstract class SeekableStreamSupervisorSpec implements SupervisorSpec
   @Override
   public abstract Supervisor createSupervisor();
 
+  /**
+   * An autoScaler instance will be returned depending on the autoScalerConfig. In case autoScalerConfig is null or autoScaler is disabled then NoopTaskAutoScaler will be returned.
+   * @param supervisor
+   * @return autoScaler
+   */
+  @Override
+  public SupervisorTaskAutoScaler createAutoscaler(Supervisor supervisor)
+  {
+    AutoScalerConfig autoScalerConfig = ingestionSchema.getIOConfig().getAutoscalerConfig();
+    if (autoScalerConfig != null && autoScalerConfig.getEnableTaskAutoScaler() && supervisor instanceof SeekableStreamSupervisor) {
+      return autoScalerConfig.createAutoScaler(supervisor, this);
+    }
+    return new NoopTaskAutoScaler();
+  }
+
   @Override
   public List<String> getDataSources()
   {
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/AutoScalerConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/AutoScalerConfig.java
new file mode 100644
index 0000000..53174a1
--- /dev/null
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/AutoScalerConfig.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.druid.guice.annotations.UnstableApi;
+import org.apache.druid.indexing.overlord.supervisor.Supervisor;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
+
+@UnstableApi
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "autoScalerStrategy", defaultImpl = LagBasedAutoScalerConfig.class)
+@JsonSubTypes(value = {
+        @Type(name = "lagBased", value = LagBasedAutoScalerConfig.class)
+})
+public interface AutoScalerConfig
+{
+  boolean getEnableTaskAutoScaler();
+  long getMinTriggerScaleActionFrequencyMillis();
+  int getTaskCountMax();
+  int getTaskCountMin();
+  SupervisorTaskAutoScaler createAutoScaler(Supervisor supervisor, SupervisorSpec spec);
+}
+
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
new file mode 100644
index 0000000..8c64527
--- /dev/null
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
+
+import org.apache.commons.collections4.queue.CircularFifoQueue;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
+import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class LagBasedAutoScaler implements SupervisorTaskAutoScaler
+{
+  private static final EmittingLogger log = new EmittingLogger(LagBasedAutoScaler.class);
+  private final String dataSource;
+  private final CircularFifoQueue<Long> lagMetricsQueue;
+  private final ScheduledExecutorService lagComputationExec;
+  private final ScheduledExecutorService allocationExec;
+  private final SupervisorSpec spec;
+  private final SeekableStreamSupervisor supervisor;
+  private final LagBasedAutoScalerConfig lagBasedAutoScalerConfig;
+
+  private static final ReentrantLock LOCK = new ReentrantLock(true);
+
+  public LagBasedAutoScaler(SeekableStreamSupervisor supervisor, String dataSource,
+      LagBasedAutoScalerConfig autoScalerConfig, SupervisorSpec spec
+  )
+  {
+    this.lagBasedAutoScalerConfig = autoScalerConfig;
+    final String supervisorId = StringUtils.format("Supervisor-%s", dataSource);
+    this.dataSource = dataSource;
+    final int slots = (int) (lagBasedAutoScalerConfig.getLagCollectionRangeMillis() / lagBasedAutoScalerConfig
+        .getLagCollectionIntervalMillis()) + 1;
+    this.lagMetricsQueue = new CircularFifoQueue<>(slots);
+    this.allocationExec = Execs.scheduledSingleThreaded(StringUtils.encodeForFormat(supervisorId) + "-Allocation-%d");
+    this.lagComputationExec = Execs.scheduledSingleThreaded(StringUtils.encodeForFormat(supervisorId) + "-Computation-%d");
+    this.spec = spec;
+    this.supervisor = supervisor;
+  }
+
+  @Override
+  public void start()
+  {
+    Callable<Integer> scaleAction = () -> {
+      LOCK.lock();
+      int desiredTaskCount = -1;
+      try {
+        desiredTaskCount = computeDesiredTaskCount(new ArrayList<>(lagMetricsQueue));
+
+        if (desiredTaskCount != -1) {
+          lagMetricsQueue.clear();
+        }
+      }
+      catch (Exception ex) {
+        log.warn(ex, "Exception while computing desired task count for [%s]", dataSource);
+      }
+      finally {
+        LOCK.unlock();
+      }
+      return desiredTaskCount;
+    };
+
+    lagComputationExec.scheduleAtFixedRate(
+        computeAndCollectLag(),
+        lagBasedAutoScalerConfig.getScaleActionStartDelayMillis(), // wait for tasks to start up
+        lagBasedAutoScalerConfig.getLagCollectionIntervalMillis(),
+        TimeUnit.MILLISECONDS
+    );
+    allocationExec.scheduleAtFixedRate(
+        supervisor.buildDynamicAllocationTask(scaleAction),
+        lagBasedAutoScalerConfig.getScaleActionStartDelayMillis() + lagBasedAutoScalerConfig
+            .getLagCollectionRangeMillis(),
+        lagBasedAutoScalerConfig.getScaleActionPeriodMillis(),
+        TimeUnit.MILLISECONDS
+    );
+    log.info(
+        "LagBasedAutoScaler will collect lag every [%d] millis and will keep [%d] data points for the last [%d] millis for dataSource [%s]",
+        lagBasedAutoScalerConfig.getLagCollectionIntervalMillis(), lagMetricsQueue.size(),
+        lagBasedAutoScalerConfig.getLagCollectionRangeMillis(), dataSource
+    );
+  }
+
+  @Override
+  public void stop()
+  {
+    allocationExec.shutdownNow();
+    lagComputationExec.shutdownNow();
+  }
+
+  @Override
+  public void reset()
+  {
+    // clear queue for kafka lags
+    if (lagMetricsQueue != null) {
+      try {
+        LOCK.lock();
+        lagMetricsQueue.clear();
+      }
+      catch (Exception e) {
+        log.warn(e, "Error,when clear queue in rest action");
+      }
+      finally {
+        LOCK.unlock();
+      }
+    }
+  }
+
+  /**
+   * This method computes current consumer lag. Gets the total lag of all partitions and fill in the lagMetricsQueue
+   *
+   * @return a Runnbale object to compute and collect lag.
+   */
+  private Runnable computeAndCollectLag()
+  {
+    return () -> {
+      LOCK.lock();
+      try {
+        if (!spec.isSuspended()) {
+          LagStats lagStats = supervisor.computeLagStats();
+          if (lagStats == null) {
+            lagMetricsQueue.offer(0L);
+          } else {
+            long totalLags = lagStats.getTotalLag();
+            lagMetricsQueue.offer(totalLags > 0 ? totalLags : 0L);
+          }
+          log.debug("Current lags [%s] for dataSource [%s].", new ArrayList<>(lagMetricsQueue), dataSource);
+        } else {
+          log.warn("[%s] supervisor is suspended, skipping lag collection", dataSource);
+        }
+      }
+      catch (Exception e) {
+        log.error(e, "Error while collecting lags");
+      }
+      finally {
+        LOCK.unlock();
+      }
+    };
+  }
+
+  /**
+   * This method determines whether to do scale actions based on collected lag points.
+   * Current algorithm of scale is simple:
+   * First of all, compute the proportion of lag points higher/lower than scaleOutThreshold/scaleInThreshold, getting scaleOutThreshold/scaleInThreshold.
+   * Secondly, compare scaleOutThreshold/scaleInThreshold with triggerScaleOutFractionThreshold/triggerScaleInFractionThreshold. P.S. Scale out action has higher priority than scale in action.
+   * Finaly, if scaleOutThreshold/scaleInThreshold is higher than triggerScaleOutFractionThreshold/triggerScaleInFractionThreshold, scale out/in action would be triggered.
+   *
+   * @param lags the lag metrics of Stream(Kafka/Kinesis)
+   * @return Integer. target number of tasksCount, -1 means skip scale action.
+   */
+  private int computeDesiredTaskCount(List<Long> lags)
+  {
+    // if supervisor is not suspended, ensure required tasks are running
+    // if suspended, ensure tasks have been requested to gracefully stop
+    log.debug("Computing desired task count for [%s], based on following lags : [%s]", dataSource, lags);
+    int beyond = 0;
+    int within = 0;
+    int metricsCount = lags.size();
+    for (Long lag : lags) {
+      if (lag >= lagBasedAutoScalerConfig.getScaleOutThreshold()) {
+        beyond++;
+      }
+      if (lag <= lagBasedAutoScalerConfig.getScaleInThreshold()) {
+        within++;
+      }
+    }
+    double beyondProportion = beyond * 1.0 / metricsCount;
+    double withinProportion = within * 1.0 / metricsCount;
+
+    log.debug("Calculated beyondProportion is [%s] and withinProportion is [%s] for dataSource [%s].", beyondProportion,
+        withinProportion, dataSource
+    );
+
+    int currentActiveTaskCount = supervisor.getActiveTaskGroupsCount();
+    int desiredActiveTaskCount;
+
+    if (beyondProportion >= lagBasedAutoScalerConfig.getTriggerScaleOutFractionThreshold()) {
+      // Do Scale out
+      int taskCount = currentActiveTaskCount + lagBasedAutoScalerConfig.getScaleOutStep();
+
+      int partitionCount = supervisor.getPartitionCount();
+      if (partitionCount <= 0) {
+        log.warn("Partition number for [%s] <= 0 ? how can it be?", dataSource);
+        return -1;
+      }
+
+      int actualTaskCountMax = Math.min(lagBasedAutoScalerConfig.getTaskCountMax(), partitionCount);
+      if (currentActiveTaskCount == actualTaskCountMax) {
+        log.warn("CurrentActiveTaskCount reached task count Max limit, skipping scale out action for dataSource [%s].",
+            dataSource
+        );
+        return -1;
+      } else {
+        desiredActiveTaskCount = Math.min(taskCount, actualTaskCountMax);
+      }
+      return desiredActiveTaskCount;
+    }
+
+    if (withinProportion >= lagBasedAutoScalerConfig.getTriggerScaleInFractionThreshold()) {
+      // Do Scale in
+      int taskCount = currentActiveTaskCount - lagBasedAutoScalerConfig.getScaleInStep();
+      if (currentActiveTaskCount == lagBasedAutoScalerConfig.getTaskCountMin()) {
+        log.warn("CurrentActiveTaskCount reached task count Min limit, skipping scale in action for dataSource [%s].",
+            dataSource
+        );
+        return -1;
+      } else {
+        desiredActiveTaskCount = Math.max(taskCount, lagBasedAutoScalerConfig.getTaskCountMin());
+      }
+      return desiredActiveTaskCount;
+    }
+    return -1;
+  }
+
+  public LagBasedAutoScalerConfig getAutoScalerConfig()
+  {
+    return lagBasedAutoScalerConfig;
+  }
+}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java
new file mode 100644
index 0000000..0a4eb0b
--- /dev/null
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.indexing.overlord.supervisor.Supervisor;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
+import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
+
+import javax.annotation.Nullable;
+
+public class LagBasedAutoScalerConfig implements AutoScalerConfig
+{
+  private final long lagCollectionIntervalMillis;
+  private final long lagCollectionRangeMillis;
+  private final long scaleActionStartDelayMillis;
+  private final long scaleActionPeriodMillis;
+  private final long scaleOutThreshold;
+  private final long scaleInThreshold;
+  private final double triggerScaleOutFractionThreshold;
+  private final double triggerScaleInFractionThreshold;
+  private int taskCountMax;
+  private int taskCountMin;
+  private final int scaleInStep;
+  private final int scaleOutStep;
+  private final boolean enableTaskAutoScaler;
+  private final long minTriggerScaleActionFrequencyMillis;
+
+  @JsonCreator
+  public LagBasedAutoScalerConfig(
+          @Nullable @JsonProperty("lagCollectionIntervalMillis") Long lagCollectionIntervalMillis,
+          @Nullable @JsonProperty("lagCollectionRangeMillis") Long lagCollectionRangeMillis,
+          @Nullable @JsonProperty("scaleActionStartDelayMillis") Long scaleActionStartDelayMillis,
+          @Nullable @JsonProperty("scaleActionPeriodMillis") Long scaleActionPeriodMillis,
+          @Nullable @JsonProperty("scaleOutThreshold") Long scaleOutThreshold,
+          @Nullable @JsonProperty("scaleInThreshold") Long scaleInThreshold,
+          @Nullable @JsonProperty("triggerScaleOutFractionThreshold") Double triggerScaleOutFractionThreshold,
+          @Nullable @JsonProperty("triggerScaleInFractionThreshold") Double triggerScaleInFractionThreshold,
+          @JsonProperty("taskCountMax") Integer taskCountMax,
+          @JsonProperty("taskCountMin") Integer taskCountMin,
+          @Nullable @JsonProperty("scaleInStep") Integer scaleInStep,
+          @Nullable @JsonProperty("scaleOutStep") Integer scaleOutStep,
+          @Nullable @JsonProperty("enableTaskAutoScaler") Boolean enableTaskAutoScaler,
+          @Nullable @JsonProperty("minTriggerScaleActionFrequencyMillis") Long minTriggerScaleActionFrequencyMillis
+  )
+  {
+    this.enableTaskAutoScaler = enableTaskAutoScaler != null ? enableTaskAutoScaler : false;
+    this.lagCollectionIntervalMillis = lagCollectionIntervalMillis != null ? lagCollectionIntervalMillis : 30000;
+    this.lagCollectionRangeMillis = lagCollectionRangeMillis != null ? lagCollectionRangeMillis : 600000;
+    this.scaleActionStartDelayMillis = scaleActionStartDelayMillis != null ? scaleActionStartDelayMillis : 300000;
+    this.scaleActionPeriodMillis = scaleActionPeriodMillis != null ? scaleActionPeriodMillis : 60000;
+    this.scaleOutThreshold = scaleOutThreshold != null ? scaleOutThreshold : 6000000;
+    this.scaleInThreshold = scaleInThreshold != null ? scaleInThreshold : 1000000;
+    this.triggerScaleOutFractionThreshold = triggerScaleOutFractionThreshold != null ? triggerScaleOutFractionThreshold : 0.3;
+    this.triggerScaleInFractionThreshold = triggerScaleInFractionThreshold != null ? triggerScaleInFractionThreshold : 0.9;
+
+    // Only do taskCountMax and taskCountMin check when autoscaler is enabled. So that users left autoConfig empty{} will not throw any exception and autoscaler is disabled.
+    // If autoscaler is disabled, no matter what configs are set, they are not used.
+    if (this.enableTaskAutoScaler) {
+      if (taskCountMax == null || taskCountMin == null) {
+        throw new RuntimeException("taskCountMax or taskCountMin can't be null!");
+      } else if (taskCountMax < taskCountMin) {
+        throw new RuntimeException("taskCountMax can't lower than taskCountMin!");
+      }
+      this.taskCountMax = taskCountMax;
+      this.taskCountMin = taskCountMin;
+    }
+
+    this.scaleInStep = scaleInStep != null ? scaleInStep : 1;
+    this.scaleOutStep = scaleOutStep != null ? scaleOutStep : 2;
+    this.minTriggerScaleActionFrequencyMillis = minTriggerScaleActionFrequencyMillis
+        != null ? minTriggerScaleActionFrequencyMillis : 600000;
+  }
+
+  @JsonProperty
+  public long getLagCollectionIntervalMillis()
+  {
+    return lagCollectionIntervalMillis;
+  }
+
+  @JsonProperty
+  public long getLagCollectionRangeMillis()
+  {
+    return lagCollectionRangeMillis;
+  }
+
+  @JsonProperty
+  public long getScaleActionStartDelayMillis()
+  {
+    return scaleActionStartDelayMillis;
+  }
+
+  @JsonProperty
+  public long getScaleActionPeriodMillis()
+  {
+    return scaleActionPeriodMillis;
+  }
+
+  @JsonProperty
+  public long getScaleOutThreshold()
+  {
+    return scaleOutThreshold;
+  }
+
+  @JsonProperty
+  public long getScaleInThreshold()
+  {
+    return scaleInThreshold;
+  }
+
+  @JsonProperty
+  public double getTriggerScaleOutFractionThreshold()
+  {
+    return triggerScaleOutFractionThreshold;
+  }
+
+  @JsonProperty
+  public double getTriggerScaleInFractionThreshold()
+  {
+    return triggerScaleInFractionThreshold;
+  }
+
+  @Override
+  @JsonProperty
+  public int getTaskCountMax()
+  {
+    return taskCountMax;
+  }
+
+  @Override
+  @JsonProperty
+  public int getTaskCountMin()
+  {
+    return taskCountMin;
+  }
+
+  @Override
+  public SupervisorTaskAutoScaler createAutoScaler(Supervisor supervisor, SupervisorSpec spec)
+  {
+    return new LagBasedAutoScaler((SeekableStreamSupervisor) supervisor, spec.getId(), this, spec);
+  }
+
+  @JsonProperty
+  public int getScaleInStep()
+  {
+    return scaleInStep;
+  }
+
+  @JsonProperty
+  public int getScaleOutStep()
+  {
+    return scaleOutStep;
+  }
+
+  @Override
+  @JsonProperty
+  public boolean getEnableTaskAutoScaler()
+  {
+    return enableTaskAutoScaler;
+  }
+
+  @Override
+  @JsonProperty
+  public long getMinTriggerScaleActionFrequencyMillis()
+  {
+    return minTriggerScaleActionFrequencyMillis;
+  }
+
+  @Override
+  public String toString()
+  {
+    return "autoScalerConfig{" +
+            "enableTaskAutoScaler=" + enableTaskAutoScaler +
+            ", taskCountMax=" + taskCountMax +
+            ", taskCountMin=" + taskCountMin +
+            ", minTriggerScaleActionFrequencyMillis=" + minTriggerScaleActionFrequencyMillis +
+            ", lagCollectionIntervalMillis=" + lagCollectionIntervalMillis +
+            ", lagCollectionIntervalMillis=" + lagCollectionIntervalMillis +
+            ", scaleOutThreshold=" + scaleOutThreshold +
+            ", triggerScaleOutFractionThreshold=" + triggerScaleOutFractionThreshold +
+            ", scaleInThreshold=" + scaleInThreshold +
+            ", triggerScaleInFractionThreshold=" + triggerScaleInFractionThreshold +
+            ", scaleActionStartDelayMillis=" + scaleActionStartDelayMillis +
+            ", scaleActionPeriodMillis=" + scaleActionPeriodMillis +
+            ", scaleInStep=" + scaleInStep +
+            ", scaleOutStep=" + scaleOutStep +
+            '}';
+  }
+}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/NoopTaskAutoScaler.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/NoopTaskAutoScaler.java
new file mode 100644
index 0000000..9bf41e5
--- /dev/null
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/NoopTaskAutoScaler.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
+
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
+
+public class NoopTaskAutoScaler implements SupervisorTaskAutoScaler
+{
+  public NoopTaskAutoScaler()
+  {
+  }
+
+  @Override
+  public void start()
+  {
+    //Do nothing
+  }
+
+  @Override
+  public void stop()
+  {
+    //Do nothing
+  }
+
+  @Override
+  public void reset()
+  {
+    //Do nothing
+  }
+}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/security/OverlordSecurityResourceFilterTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/security/OverlordSecurityResourceFilterTest.java
index e603a55..3b775ca 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/security/OverlordSecurityResourceFilterTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/security/OverlordSecurityResourceFilterTest.java
@@ -32,6 +32,8 @@ import org.apache.druid.indexing.overlord.supervisor.Supervisor;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorResource;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
+import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.NoopTaskAutoScaler;
 import org.apache.druid.indexing.worker.http.WorkerResource;
 import org.apache.druid.server.http.security.ResourceFilterTestHelper;
 import org.apache.druid.server.security.AuthorizerMapper;
@@ -127,6 +129,12 @@ public class OverlordSecurityResourceFilterTest extends ResourceFilterTestHelper
         }
 
         @Override
+        public SupervisorTaskAutoScaler createAutoscaler(Supervisor supervisor)
+        {
+          return new NoopTaskAutoScaler();
+        }
+
+        @Override
         public List<String> getDataSources()
         {
           return ImmutableList.of("test");
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
index 1390e7a..c22460e 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
@@ -26,6 +26,7 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import org.apache.druid.indexing.overlord.DataSourceMetadata;
 import org.apache.druid.indexing.overlord.TaskMaster;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.segment.TestHelper;
 import org.apache.druid.server.security.Access;
@@ -1156,6 +1157,12 @@ public class SupervisorResourceTest extends EasyMockSupport
     }
 
     @Override
+    public SupervisorTaskAutoScaler createAutoscaler(Supervisor supervisor)
+    {
+      return null;
+    }
+
+    @Override
     public List<String> getDataSources()
     {
       return datasources;
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
new file mode 100644
index 0000000..51a39fc
--- /dev/null
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
@@ -0,0 +1,950 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.data.input.impl.ByteEntity;
+import org.apache.druid.data.input.impl.DimensionSchema;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.JsonInputFormat;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.overlord.DataSourceMetadata;
+import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
+import org.apache.druid.indexing.overlord.TaskMaster;
+import org.apache.druid.indexing.overlord.TaskStorage;
+import org.apache.druid.indexing.overlord.supervisor.Supervisor;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
+import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
+import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
+import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
+import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
+import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIngestionSpec;
+import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorReportPayload;
+import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
+import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorTuningConfig;
+import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
+import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.LagBasedAutoScaler;
+import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.LagBasedAutoScalerConfig;
+import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.NoopTaskAutoScaler;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
+import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig;
+import org.easymock.EasyMock;
+import org.easymock.EasyMockSupport;
+import org.joda.time.DateTime;
+import org.joda.time.Duration;
+import org.joda.time.Period;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ScheduledExecutorService;
+
+public class SeekableStreamSupervisorSpecTest extends EasyMockSupport
+{
+  private SeekableStreamSupervisorIngestionSpec ingestionSchema;
+  private TaskStorage taskStorage;
+  private TaskMaster taskMaster;
+  private IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator;
+  private ServiceEmitter emitter;
+  private RowIngestionMetersFactory rowIngestionMetersFactory;
+  private DataSchema dataSchema;
+  private SeekableStreamSupervisorTuningConfig seekableStreamSupervisorTuningConfig;
+  private SeekableStreamSupervisorIOConfig seekableStreamSupervisorIOConfig;
+  private static final ObjectMapper OBJECT_MAPPER = TestHelper.makeJsonMapper();
+  private SeekableStreamIndexTaskClientFactory taskClientFactory;
+  private static final String STREAM = "stream";
+  private static final String DATASOURCE = "testDS";
+  private SeekableStreamSupervisorSpec spec;
+  private SupervisorStateManagerConfig supervisorConfig;
+
+  private SeekableStreamSupervisor supervisor4;
+
+  private SeekableStreamIndexTaskClientFactory indexTaskClientFactory;
+  private ObjectMapper mapper;
+  private DruidMonitorSchedulerConfig monitorSchedulerConfig;
+  private SupervisorStateManagerConfig supervisorStateManagerConfig;
+
+  @Before
+  public void setUp()
+  {
+    ingestionSchema = EasyMock.mock(SeekableStreamSupervisorIngestionSpec.class);
+    taskStorage = EasyMock.mock(TaskStorage.class);
+    taskMaster = EasyMock.mock(TaskMaster.class);
+    indexerMetadataStorageCoordinator = EasyMock.mock(IndexerMetadataStorageCoordinator.class);
+    emitter = EasyMock.mock(ServiceEmitter.class);
+    rowIngestionMetersFactory = EasyMock.mock(RowIngestionMetersFactory.class);
+    dataSchema = EasyMock.mock(DataSchema.class);
+    seekableStreamSupervisorTuningConfig = EasyMock.mock(SeekableStreamSupervisorTuningConfig.class);
+    seekableStreamSupervisorIOConfig = EasyMock.mock(SeekableStreamSupervisorIOConfig.class);
+    taskClientFactory = EasyMock.mock(SeekableStreamIndexTaskClientFactory.class);
+    spec = EasyMock.mock(SeekableStreamSupervisorSpec.class);
+    supervisorConfig = new SupervisorStateManagerConfig();
+    indexTaskClientFactory = EasyMock.mock(SeekableStreamIndexTaskClientFactory.class);
+    mapper = new DefaultObjectMapper();
+    monitorSchedulerConfig = EasyMock.mock(DruidMonitorSchedulerConfig.class);
+    supervisorStateManagerConfig = EasyMock.mock(SupervisorStateManagerConfig.class);
+    supervisor4 = EasyMock.mock(SeekableStreamSupervisor.class);
+  }
+
+  private abstract class BaseTestSeekableStreamSupervisor extends SeekableStreamSupervisor<String, String, ByteEntity>
+  {
+    private BaseTestSeekableStreamSupervisor()
+    {
+      super(
+              "testSupervisorId",
+              taskStorage,
+              taskMaster,
+              indexerMetadataStorageCoordinator,
+              taskClientFactory,
+              OBJECT_MAPPER,
+              spec,
+              rowIngestionMetersFactory,
+              false
+      );
+    }
+
+    @Override
+    protected String baseTaskName()
+    {
+      return "test";
+    }
+
+    @Override
+    protected void updatePartitionLagFromStream()
+    {
+        // do nothing
+    }
+
+    @Nullable
+    @Override
+    protected Map<String, Long> getPartitionRecordLag()
+    {
+      return null;
+    }
+
+    @Nullable
+    @Override
+    protected Map<String, Long> getPartitionTimeLag()
+    {
+      return null;
+    }
+
+    @Override
+    protected SeekableStreamIndexTaskIOConfig createTaskIoConfig(
+            int groupId,
+            Map<String, String> startPartitions,
+            Map<String, String> endPartitions,
+            String baseSequenceName,
+            DateTime minimumMessageTime,
+            DateTime maximumMessageTime,
+            Set<String> exclusiveStartSequenceNumberPartitions,
+            SeekableStreamSupervisorIOConfig ioConfig
+    )
+    {
+      return new SeekableStreamIndexTaskIOConfig<String, String>(
+              groupId,
+              baseSequenceName,
+              new SeekableStreamStartSequenceNumbers<>(STREAM, startPartitions, exclusiveStartSequenceNumberPartitions),
+              new SeekableStreamEndSequenceNumbers<>(STREAM, endPartitions),
+              true,
+              minimumMessageTime,
+              maximumMessageTime,
+              ioConfig.getInputFormat()
+      )
+      {
+      };
+    }
+
+    @Override
+    protected List<SeekableStreamIndexTask<String, String, ByteEntity>> createIndexTasks(
+            int replicas,
+            String baseSequenceName,
+            ObjectMapper sortingMapper,
+            TreeMap<Integer, Map<String, String>> sequenceOffsets,
+            SeekableStreamIndexTaskIOConfig taskIoConfig,
+            SeekableStreamIndexTaskTuningConfig taskTuningConfig,
+            RowIngestionMetersFactory rowIngestionMetersFactory
+    )
+    {
+      return null;
+    }
+
+    @Override
+    protected int getTaskGroupIdForPartition(String partition)
+    {
+      return 0;
+    }
+
+    @Override
+    protected boolean checkSourceMetadataMatch(DataSourceMetadata metadata)
+    {
+      return true;
+    }
+
+    @Override
+    protected boolean doesTaskTypeMatchSupervisor(Task task)
+    {
+      return true;
+    }
+
+    @Override
+    protected SeekableStreamDataSourceMetadata<String, String> createDataSourceMetaDataForReset(
+            String stream,
+            Map<String, String> map
+    )
+    {
+      return null;
+    }
+
+    @Override
+    protected OrderedSequenceNumber<String> makeSequenceNumber(String seq, boolean isExclusive)
+    {
+      return new OrderedSequenceNumber<String>(seq, isExclusive)
+      {
+        @Override
+        public int compareTo(OrderedSequenceNumber<String> o)
+        {
+          return new BigInteger(this.get()).compareTo(new BigInteger(o.get()));
+        }
+      };
+    }
+
+    @Override
+    protected Map<String, Long> getRecordLagPerPartition(Map<String, String> currentOffsets)
+    {
+      return null;
+    }
+
+    @Override
+    protected Map<String, Long> getTimeLagPerPartition(Map<String, String> currentOffsets)
+    {
+      return null;
+    }
+
+    @Override
+    protected RecordSupplier<String, String, ByteEntity> setupRecordSupplier()
+    {
+      return recordSupplier;
+    }
+
+    @Override
+    protected SeekableStreamSupervisorReportPayload<String, String> createReportPayload(
+            int numPartitions,
+            boolean includeOffsets
+    )
+    {
+      return new SeekableStreamSupervisorReportPayload<String, String>(
+              DATASOURCE,
+              STREAM,
+              1,
+              1,
+              1L,
+              null,
+              null,
+              null,
+              null,
+              null,
+              null,
+              false,
+              true,
+              null,
+              null,
+              null
+      )
+      {
+      };
+    }
+
+    @Override
+    protected String getNotSetMarker()
+    {
+      return "NOT_SET";
+    }
+
+    @Override
+    protected String getEndOfPartitionMarker()
+    {
+      return "EOF";
+    }
+
+    @Override
+    protected boolean isEndOfShard(String seqNum)
+    {
+      return false;
+    }
+
+    @Override
+    protected boolean isShardExpirationMarker(String seqNum)
+    {
+      return false;
+    }
+
+    @Override
+    protected boolean useExclusiveStartSequenceNumberForNonFirstSequence()
+    {
+      return false;
+    }
+  }
+
+  private class TestSeekableStreamSupervisor extends BaseTestSeekableStreamSupervisor
+  {
+    private int partitionNumbers;
+
+    public TestSeekableStreamSupervisor(int partitionNumbers)
+    {
+      this.partitionNumbers = partitionNumbers;
+    }
+
+    @Override
+    protected void scheduleReporting(ScheduledExecutorService reportingExec)
+    {
+        // do nothing
+    }
+
+    @Override
+    public LagStats computeLagStats()
+    {
+      return new LagStats(0, 0, 0);
+    }
+
+    @Override
+    public int getPartitionCount()
+    {
+      return partitionNumbers;
+    }
+  }
+
+
+  private static class TestSeekableStreamSupervisorSpec extends SeekableStreamSupervisorSpec
+  {
+    private SeekableStreamSupervisor supervisor;
+    private String id;
+
+    public TestSeekableStreamSupervisorSpec(SeekableStreamSupervisorIngestionSpec ingestionSchema,
+                                             @Nullable Map<String, Object> context,
+                                             Boolean suspended,
+                                             TaskStorage taskStorage,
+                                             TaskMaster taskMaster,
+                                             IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator,
+                                             SeekableStreamIndexTaskClientFactory indexTaskClientFactory,
+                                             ObjectMapper mapper,
+                                             ServiceEmitter emitter,
+                                             DruidMonitorSchedulerConfig monitorSchedulerConfig,
+                                             RowIngestionMetersFactory rowIngestionMetersFactory,
+                                             SupervisorStateManagerConfig supervisorStateManagerConfig,
+                                             SeekableStreamSupervisor supervisor,
+                                             String id)
+    {
+      super(
+              ingestionSchema,
+              context,
+              suspended,
+              taskStorage,
+              taskMaster,
+              indexerMetadataStorageCoordinator,
+              indexTaskClientFactory,
+              mapper,
+              emitter,
+              monitorSchedulerConfig,
+              rowIngestionMetersFactory,
+              supervisorStateManagerConfig);
+
+      this.supervisor = supervisor;
+      this.id = id;
+    }
+
+    @Override
+    public List<String> getDataSources()
+    {
+      return new ArrayList<>();
+    }
+
+    @Override
+    public String getId()
+    {
+      return id;
+    }
+
+    @Override
+    public Supervisor createSupervisor()
+    {
+      return supervisor;
+    }
+
+    @Override
+    public String getType()
+    {
+      return null;
+    }
+
+    @Override
+    public String getSource()
+    {
+      return null;
+    }
+
+    @Override
+    protected SeekableStreamSupervisorSpec toggleSuspend(boolean suspend)
+    {
+      return null;
+    }
+  }
+
+  private static SeekableStreamSupervisorTuningConfig getTuningConfig()
+  {
+    return new SeekableStreamSupervisorTuningConfig()
+    {
+      @Override
+      public Integer getWorkerThreads()
+      {
+        return 1;
+      }
+
+      @Override
+      public Integer getChatThreads()
+      {
+        return 1;
+      }
+
+      @Override
+      public Long getChatRetries()
+      {
+        return 1L;
+      }
+
+      @Override
+      public Duration getHttpTimeout()
+      {
+        return new Period("PT1M").toStandardDuration();
+      }
+
+      @Override
+      public Duration getShutdownTimeout()
+      {
+        return new Period("PT1S").toStandardDuration();
+      }
+
+      @Override
+      public Duration getRepartitionTransitionDuration()
+      {
+        return new Period("PT2M").toStandardDuration();
+      }
+
+      @Override
+      public Duration getOffsetFetchPeriod()
+      {
+        return new Period("PT5M").toStandardDuration();
+      }
+
+      @Override
+      public SeekableStreamIndexTaskTuningConfig convertToTaskTuningConfig()
+      {
+        return new SeekableStreamIndexTaskTuningConfig(
+                null,
+                null,
+                null,
+                null,
+                null,
+                null,
+                null,
+                null,
+                null,
+                null,
+                null,
+                null,
+                null,
+                null,
+                null,
+                null,
+                null,
+                null,
+                null,
+                null,
+                null
+        )
+        {
+          @Override
+          public SeekableStreamIndexTaskTuningConfig withBasePersistDirectory(File dir)
+          {
+            return null;
+          }
+
+          @Override
+          public String toString()
+          {
+            return null;
+          }
+        };
+      }
+    };
+  }
+
+  @Test
+  public void testAutoScalerConfig()
+  {
+    AutoScalerConfig autoScalerConfigEmpty = mapper.convertValue(new HashMap<>(), AutoScalerConfig.class);
+    Assert.assertTrue(autoScalerConfigEmpty instanceof LagBasedAutoScalerConfig);
+    Assert.assertFalse(autoScalerConfigEmpty.getEnableTaskAutoScaler());
+
+    AutoScalerConfig autoScalerConfigNull = mapper.convertValue(null, AutoScalerConfig.class);
+    Assert.assertNull(autoScalerConfigNull);
+
+    AutoScalerConfig autoScalerConfigDefault = mapper.convertValue(ImmutableMap.of("autoScalerStrategy", "lagBased"), AutoScalerConfig.class);
+    Assert.assertTrue(autoScalerConfigDefault instanceof LagBasedAutoScalerConfig);
+
+    AutoScalerConfig autoScalerConfigValue = mapper.convertValue(ImmutableMap.of("lagCollectionIntervalMillis", "1"), AutoScalerConfig.class);
+    Assert.assertTrue(autoScalerConfigValue instanceof LagBasedAutoScalerConfig);
+    LagBasedAutoScalerConfig lagBasedAutoScalerConfig = (LagBasedAutoScalerConfig) autoScalerConfigValue;
+    Assert.assertEquals(lagBasedAutoScalerConfig.getLagCollectionIntervalMillis(), 1);
+
+    Exception e = null;
+    try {
+      AutoScalerConfig autoScalerError = mapper.convertValue(ImmutableMap.of("enableTaskAutoScaler", "true", "taskCountMax", "1", "taskCountMin", "4"), AutoScalerConfig.class);
+    }
+    catch (RuntimeException ex) {
+      e = ex;
+    }
+    Assert.assertNotNull(e);
+
+    e = null;
+    try {
+      // taskCountMax and taskCountMin couldn't be ignored.
+      AutoScalerConfig autoScalerError2 = mapper.convertValue(ImmutableMap.of("enableTaskAutoScaler", "true"), AutoScalerConfig.class);
+    }
+    catch (RuntimeException ex) {
+      e = ex;
+    }
+    Assert.assertNotNull(e);
+
+
+  }
+
+  @Test
+  public void testAutoScalerCreated()
+  {
+    HashMap<String, Object> autoScalerConfig = new HashMap<>();
+    autoScalerConfig.put("enableTaskAutoScaler", true);
+    autoScalerConfig.put("lagCollectionIntervalMillis", 500);
+    autoScalerConfig.put("lagCollectionRangeMillis", 500);
+    autoScalerConfig.put("scaleOutThreshold", 5000000);
+    autoScalerConfig.put("triggerScaleOutFractionThreshold", 0.3);
+    autoScalerConfig.put("scaleInThreshold", 1000000);
+    autoScalerConfig.put("triggerScaleInFractionThreshold", 0.8);
+    autoScalerConfig.put("scaleActionStartDelayMillis", 0);
+    autoScalerConfig.put("scaleActionPeriodMillis", 100);
+    autoScalerConfig.put("taskCountMax", 8);
+    autoScalerConfig.put("taskCountMin", 1);
+    autoScalerConfig.put("scaleInStep", 1);
+    autoScalerConfig.put("scaleOutStep", 2);
+    autoScalerConfig.put("minTriggerScaleActionFrequencyMillis", 1200000);
+
+    EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
+    EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
+    EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes();
+    EasyMock.replay(ingestionSchema);
+
+    EasyMock.expect(seekableStreamSupervisorIOConfig.getAutoscalerConfig()).andReturn(mapper.convertValue(autoScalerConfig, AutoScalerConfig.class)).anyTimes();
+    EasyMock.replay(seekableStreamSupervisorIOConfig);
+
+    EasyMock.expect(supervisor4.getActiveTaskGroupsCount()).andReturn(0).anyTimes();
+    EasyMock.replay(supervisor4);
+
+    TestSeekableStreamSupervisorSpec spec = new TestSeekableStreamSupervisorSpec(ingestionSchema,
+            null,
+            false,
+            taskStorage,
+            taskMaster,
+            indexerMetadataStorageCoordinator,
+            indexTaskClientFactory,
+            mapper,
+            emitter,
+            monitorSchedulerConfig,
+            rowIngestionMetersFactory,
+            supervisorStateManagerConfig,
+            supervisor4,
+            "id1");
+    SupervisorTaskAutoScaler autoscaler = spec.createAutoscaler(supervisor4);
+    Assert.assertTrue(autoscaler instanceof LagBasedAutoScaler);
+
+    EasyMock.reset(seekableStreamSupervisorIOConfig);
+    autoScalerConfig.put("enableTaskAutoScaler", false);
+    EasyMock.expect(seekableStreamSupervisorIOConfig.getAutoscalerConfig()).andReturn(mapper.convertValue(autoScalerConfig, AutoScalerConfig.class)).anyTimes();
+    EasyMock.replay(seekableStreamSupervisorIOConfig);
+    SupervisorTaskAutoScaler autoscaler2 = spec.createAutoscaler(supervisor4);
+    Assert.assertTrue(autoscaler2 instanceof NoopTaskAutoScaler);
+
+    EasyMock.reset(seekableStreamSupervisorIOConfig);
+    autoScalerConfig.remove("enableTaskAutoScaler");
+    EasyMock.expect(seekableStreamSupervisorIOConfig.getAutoscalerConfig()).andReturn(mapper.convertValue(autoScalerConfig, AutoScalerConfig.class)).anyTimes();
+    EasyMock.replay(seekableStreamSupervisorIOConfig);
+    SupervisorTaskAutoScaler autoscaler3 = spec.createAutoscaler(supervisor4);
+    Assert.assertTrue(autoscaler3 instanceof NoopTaskAutoScaler);
+
+    EasyMock.reset(seekableStreamSupervisorIOConfig);
+    autoScalerConfig.clear();
+    EasyMock.expect(seekableStreamSupervisorIOConfig.getAutoscalerConfig()).andReturn(mapper.convertValue(autoScalerConfig, AutoScalerConfig.class)).anyTimes();
+    EasyMock.replay(seekableStreamSupervisorIOConfig);
+    Assert.assertTrue(autoScalerConfig.isEmpty());
+    SupervisorTaskAutoScaler autoscaler4 = spec.createAutoscaler(supervisor4);
+    Assert.assertTrue(autoscaler4 instanceof NoopTaskAutoScaler);
+
+  }
+
+  @Test
+  public void testDefaultAutoScalerConfigCreatedWithDefault()
+  {
+    EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
+    EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
+    EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes();
+    EasyMock.replay(ingestionSchema);
+
+    EasyMock.expect(seekableStreamSupervisorIOConfig.getAutoscalerConfig()).andReturn(mapper.convertValue(ImmutableMap.of("lagCollectionIntervalMillis", "1", "enableTaskAutoScaler", true, "taskCountMax", "4", "taskCountMin", "1"), AutoScalerConfig.class)).anyTimes();
+    EasyMock.replay(seekableStreamSupervisorIOConfig);
+
+    EasyMock.expect(supervisor4.getActiveTaskGroupsCount()).andReturn(0).anyTimes();
+    EasyMock.replay(supervisor4);
+
+    TestSeekableStreamSupervisorSpec spec = new TestSeekableStreamSupervisorSpec(ingestionSchema,
+            null,
+            false,
+            taskStorage,
+            taskMaster,
+            indexerMetadataStorageCoordinator,
+            indexTaskClientFactory,
+            mapper,
+            emitter,
+            monitorSchedulerConfig,
+            rowIngestionMetersFactory,
+            supervisorStateManagerConfig,
+            supervisor4,
+            "id1");
+    SupervisorTaskAutoScaler autoscaler = spec.createAutoscaler(supervisor4);
+    Assert.assertTrue(autoscaler instanceof LagBasedAutoScaler);
+    LagBasedAutoScaler lagBasedAutoScaler = (LagBasedAutoScaler) autoscaler;
+    LagBasedAutoScalerConfig lagBasedAutoScalerConfig = lagBasedAutoScaler.getAutoScalerConfig();
+    Assert.assertEquals(lagBasedAutoScalerConfig.getLagCollectionIntervalMillis(), 1);
+    Assert.assertEquals(lagBasedAutoScalerConfig.getLagCollectionRangeMillis(), 600000);
+    Assert.assertEquals(lagBasedAutoScalerConfig.getScaleActionStartDelayMillis(), 300000);
+    Assert.assertEquals(lagBasedAutoScalerConfig.getScaleActionPeriodMillis(), 60000);
+    Assert.assertEquals(lagBasedAutoScalerConfig.getScaleOutThreshold(), 6000000);
+    Assert.assertEquals(lagBasedAutoScalerConfig.getScaleInThreshold(), 1000000);
+    Assert.assertEquals(lagBasedAutoScalerConfig.getTaskCountMax(), 4);
+    Assert.assertEquals(lagBasedAutoScalerConfig.getTaskCountMin(), 1);
+    Assert.assertEquals(lagBasedAutoScalerConfig.getScaleInStep(), 1);
+    Assert.assertEquals(lagBasedAutoScalerConfig.getScaleOutStep(), 2);
+    Assert.assertEquals(lagBasedAutoScalerConfig.getMinTriggerScaleActionFrequencyMillis(), 600000);
+  }
+
+  @Test
+  public void testSeekableStreamSupervisorSpecWithScaleOut() throws InterruptedException
+  {
+
+    EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
+
+    EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
+    EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(1, true)).anyTimes();
+    EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
+    EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
+    EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
+    EasyMock.replay(spec);
+
+    EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
+    EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
+    EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes();
+    EasyMock.replay(ingestionSchema);
+
+    EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
+    EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()).anyTimes();
+    EasyMock.replay(taskMaster);
+
+    TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(3);
+
+    LagStats lagStats = supervisor.computeLagStats();
+
+    LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(supervisor, DATASOURCE, mapper.convertValue(getScaleOutProperties(2), LagBasedAutoScalerConfig.class), spec);
+    supervisor.start();
+    autoScaler.start();
+    supervisor.runInternal();
+    int taskCountBeforeScaleOut = supervisor.getIoConfig().getTaskCount();
+    Assert.assertEquals(1, taskCountBeforeScaleOut);
+    Thread.sleep(1 * 1000);
+    int taskCountAfterScaleOut = supervisor.getIoConfig().getTaskCount();
+    Assert.assertEquals(2, taskCountAfterScaleOut);
+
+    autoScaler.reset();
+    autoScaler.stop();
+
+  }
+
+  @Test
+  public void testSeekableStreamSupervisorSpecWithScaleOutSmallPartitionNumber() throws InterruptedException
+  {
+
+    EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
+
+    EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
+    EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(1, true)).anyTimes();
+    EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
+    EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
+    EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
+    EasyMock.replay(spec);
+
+    EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
+    EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
+    EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes();
+    EasyMock.replay(ingestionSchema);
+
+    EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
+    EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()).anyTimes();
+    EasyMock.replay(taskMaster);
+
+    TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(2);
+
+    LagStats lagStats = supervisor.computeLagStats();
+
+    LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(supervisor, DATASOURCE, mapper.convertValue(getScaleOutProperties(3), LagBasedAutoScalerConfig.class), spec);
+    supervisor.start();
+    autoScaler.start();
+    supervisor.runInternal();
+    int taskCountBeforeScaleOut = supervisor.getIoConfig().getTaskCount();
+    Assert.assertEquals(1, taskCountBeforeScaleOut);
+    Thread.sleep(1 * 1000);
+    int taskCountAfterScaleOut = supervisor.getIoConfig().getTaskCount();
+    Assert.assertEquals(2, taskCountAfterScaleOut);
+
+    autoScaler.reset();
+    autoScaler.stop();
+
+  }
+
+  @Test
+  public void testSeekableStreamSupervisorSpecWithScaleIn() throws InterruptedException
+  {
+
+    EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
+
+    EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
+    EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(2, false)).anyTimes();
+    EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
+    EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
+    EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
+    EasyMock.replay(spec);
+
+    EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
+    EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
+    EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes();
+    EasyMock.replay(ingestionSchema);
+
+    EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
+    EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()).anyTimes();
+    EasyMock.replay(taskMaster);
+
+    TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(3);
+    LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(supervisor, DATASOURCE, mapper.convertValue(getScaleInProperties(), LagBasedAutoScalerConfig.class), spec);
+
+    // enable autoscaler so that taskcount config will be ignored and init value of taskCount will use taskCountMin.
+    Assert.assertEquals(1, (int) supervisor.getIoConfig().getTaskCount());
+    supervisor.getIoConfig().setTaskCount(2);
+    supervisor.start();
+    autoScaler.start();
+    supervisor.runInternal();
+    int taskCountBeforeScaleOut = supervisor.getIoConfig().getTaskCount();
+    Assert.assertEquals(2, taskCountBeforeScaleOut);
+    Thread.sleep(1 * 1000);
+    int taskCountAfterScaleOut = supervisor.getIoConfig().getTaskCount();
+    Assert.assertEquals(1, taskCountAfterScaleOut);
+
+    autoScaler.reset();
+    autoScaler.stop();
+  }
+
+  @Test
+  public void testSeekableStreamSupervisorSpecWithScaleDisable() throws InterruptedException
+  {
+
+    SeekableStreamSupervisorIOConfig seekableStreamSupervisorIOConfig = new SeekableStreamSupervisorIOConfig(
+            "stream",
+            new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false),
+            1,
+            1,
+            new Period("PT1H"),
+            new Period("P1D"),
+            new Period("PT30S"),
+            false,
+            new Period("PT30M"),
+            null,
+            null, null, null
+    ) {};
+    EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
+
+    EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
+    EasyMock.expect(spec.getIoConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
+    EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
+    EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
+    EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
+    EasyMock.replay(spec);
+
+    EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(this.seekableStreamSupervisorIOConfig).anyTimes();
+    EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
+    EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes();
+    EasyMock.replay(ingestionSchema);
+
+    EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
+    EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()).anyTimes();
+    EasyMock.replay(taskMaster);
+
+    TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(3);
+    NoopTaskAutoScaler autoScaler = new NoopTaskAutoScaler();
+    supervisor.start();
+    autoScaler.start();
+    supervisor.runInternal();
+    int taskCountBeforeScaleOut = supervisor.getIoConfig().getTaskCount();
+    Assert.assertEquals(1, taskCountBeforeScaleOut);
+    Thread.sleep(1 * 1000);
+    int taskCountAfterScaleOut = supervisor.getIoConfig().getTaskCount();
+    Assert.assertEquals(1, taskCountAfterScaleOut);
+
+    autoScaler.reset();
+    autoScaler.stop();
+  }
+
+  private static DataSchema getDataSchema()
+  {
+    List<DimensionSchema> dimensions = new ArrayList<>();
+    dimensions.add(StringDimensionSchema.create("dim1"));
+    dimensions.add(StringDimensionSchema.create("dim2"));
+
+    return new DataSchema(
+            DATASOURCE,
+            new TimestampSpec("timestamp", "iso", null),
+            new DimensionsSpec(
+                    dimensions,
+                    null,
+                    null
+            ),
+            new AggregatorFactory[]{new CountAggregatorFactory("rows")},
+            new UniformGranularitySpec(
+                    Granularities.HOUR,
+                    Granularities.NONE,
+                    ImmutableList.of()
+            ),
+            null
+    );
+  }
+
+  private SeekableStreamSupervisorIOConfig getIOConfig(int taskCount, boolean scaleOut)
+  {
+    if (scaleOut) {
+      return new SeekableStreamSupervisorIOConfig(
+              "stream",
+              new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false),
+              1,
+              taskCount,
+              new Period("PT1H"),
+              new Period("P1D"),
+              new Period("PT30S"),
+              false,
+              new Period("PT30M"),
+              null,
+              null, mapper.convertValue(getScaleOutProperties(2), AutoScalerConfig.class), null
+      ) {};
+    } else {
+      return new SeekableStreamSupervisorIOConfig(
+              "stream",
+              new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false),
+              1,
+              taskCount,
+              new Period("PT1H"),
+              new Period("P1D"),
+              new Period("PT30S"),
+              false,
+              new Period("PT30M"),
+              null,
+              null, mapper.convertValue(getScaleInProperties(), AutoScalerConfig.class), null
+        ) {};
+    }
+  }
+
+  private static Map<String, Object> getScaleOutProperties(int maxTaskCount)
+  {
+    HashMap<String, Object> autoScalerConfig = new HashMap<>();
+    autoScalerConfig.put("enableTaskAutoScaler", true);
+    autoScalerConfig.put("lagCollectionIntervalMillis", 500);
+    autoScalerConfig.put("lagCollectionRangeMillis", 500);
+    autoScalerConfig.put("scaleOutThreshold", 0);
+    autoScalerConfig.put("triggerScaleOutFractionThreshold", 0.0);
+    autoScalerConfig.put("scaleInThreshold", 1000000);
+    autoScalerConfig.put("triggerScaleInFractionThreshold", 0.8);
+    autoScalerConfig.put("scaleActionStartDelayMillis", 0);
+    autoScalerConfig.put("scaleActionPeriodMillis", 100);
+    autoScalerConfig.put("taskCountMax", maxTaskCount);
+    autoScalerConfig.put("taskCountMin", 1);
+    autoScalerConfig.put("scaleInStep", 1);
+    autoScalerConfig.put("scaleOutStep", 2);
+    autoScalerConfig.put("minTriggerScaleActionFrequencyMillis", 1200000);
+    return autoScalerConfig;
+  }
+
+  private static Map<String, Object> getScaleInProperties()
+  {
+    HashMap<String, Object> autoScalerConfig = new HashMap<>();
+    autoScalerConfig.put("enableTaskAutoScaler", true);
+    autoScalerConfig.put("lagCollectionIntervalMillis", 500);
+    autoScalerConfig.put("lagCollectionRangeMillis", 500);
+    autoScalerConfig.put("scaleOutThreshold", 8000000);
+    autoScalerConfig.put("triggerScaleOutFractionThreshold", 0.3);
+    autoScalerConfig.put("scaleInThreshold", 0);
+    autoScalerConfig.put("triggerScaleInFractionThreshold", 0.0);
+    autoScalerConfig.put("scaleActionStartDelayMillis", 0);
+    autoScalerConfig.put("scaleActionPeriodMillis", 100);
+    autoScalerConfig.put("taskCountMax", 2);
+    autoScalerConfig.put("taskCountMin", 1);
+    autoScalerConfig.put("scaleInStep", 1);
+    autoScalerConfig.put("scaleOutStep", 2);
+    autoScalerConfig.put("minTriggerScaleActionFrequencyMillis", 1200000);
+    return autoScalerConfig;
+  }
+
+}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
index 6adf4c8..42e8c55 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
@@ -43,6 +43,7 @@ import org.apache.druid.indexing.overlord.TaskStorage;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager.BasicState;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
 import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
 import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
 import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
@@ -58,6 +59,7 @@ import org.apache.druid.indexing.seekablestream.common.StreamException;
 import org.apache.druid.indexing.seekablestream.common.StreamPartition;
 import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManager.SeekableStreamExceptionEvent;
 import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManager.SeekableStreamState;
+import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.granularity.Granularities;
@@ -82,9 +84,11 @@ import org.junit.Before;
 import org.junit.Test;
 
 import javax.annotation.Nullable;
+
 import java.io.File;
 import java.math.BigInteger;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -765,6 +769,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
         new Period("PT30M"),
         null,
         null,
+        null,
         null
     )
     {
@@ -825,12 +830,32 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
         false,
         new Period("PT30M"),
         null,
-        null, null
+        null, OBJECT_MAPPER.convertValue(getProperties(), AutoScalerConfig.class), null
     )
     {
     };
   }
 
+  private static Map<String, Object> getProperties()
+  {
+    HashMap<String, Object> autoScalerConfig = new HashMap<>();
+    autoScalerConfig.put("enableTaskAutoScaler", true);
+    autoScalerConfig.put("lagCollectionIntervalMillis", 500);
+    autoScalerConfig.put("lagCollectionRangeMillis", 500);
+    autoScalerConfig.put("scaleOutThreshold", 5000000);
+    autoScalerConfig.put("triggerScaleOutFractionThreshold", 0.3);
+    autoScalerConfig.put("scaleInThreshold", 1000000);
+    autoScalerConfig.put("triggerScaleInFractionThreshold", 0.8);
+    autoScalerConfig.put("scaleActionStartDelayMillis", 0);
+    autoScalerConfig.put("scaleActionPeriodMillis", 100);
+    autoScalerConfig.put("taskCountMax", 8);
+    autoScalerConfig.put("taskCountMin", 1);
+    autoScalerConfig.put("scaleInStep", 1);
+    autoScalerConfig.put("scaleOutStep", 2);
+    autoScalerConfig.put("minTriggerScaleActionFrequencyMillis", 1200000);
+    return autoScalerConfig;
+  }
+
   private static SeekableStreamSupervisorTuningConfig getTuningConfig()
   {
     return new SeekableStreamSupervisorTuningConfig()
@@ -1177,6 +1202,12 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
     {
       // do nothing
     }
+
+    @Override
+    public LagStats computeLagStats()
+    {
+      return null;
+    }
   }
 
   private class TestEmittingTestSeekableStreamSupervisor extends BaseTestSeekableStreamSupervisor
@@ -1220,6 +1251,12 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
     }
 
     @Override
+    public LagStats computeLagStats()
+    {
+      return null;
+    }
+
+    @Override
     protected void scheduleReporting(ScheduledExecutorService reportingExec)
     {
       SeekableStreamSupervisorIOConfig ioConfig = spec.getIoConfig();
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
index 5c2a18e..c38b67a 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
@@ -74,10 +74,14 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
 
   private static final String QUERIES_FILE = "/stream/queries/stream_index_queries.json";
   private static final String SUPERVISOR_SPEC_TEMPLATE_FILE = "supervisor_spec_template.json";
+  private static final String SUPERVISOR_WITH_AUTOSCALER_SPEC_TEMPLATE_FILE = "supervisor_with_autoscaler_spec_template.json";
 
   protected static final String DATA_RESOURCE_ROOT = "/stream/data";
   protected static final String SUPERVISOR_SPEC_TEMPLATE_PATH =
       String.join("/", DATA_RESOURCE_ROOT, SUPERVISOR_SPEC_TEMPLATE_FILE);
+  protected static final String SUPERVISOR_WITH_AUTOSCALER_SPEC_TEMPLATE_PATH =
+          String.join("/", DATA_RESOURCE_ROOT, SUPERVISOR_WITH_AUTOSCALER_SPEC_TEMPLATE_FILE);
+
   protected static final String SERIALIZER_SPEC_DIR = "serializer";
   protected static final String INPUT_FORMAT_SPEC_DIR = "input_format";
   protected static final String INPUT_ROW_PARSER_SPEC_DIR = "parser";
@@ -294,6 +298,70 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
     }
   }
 
+  protected void doTestIndexDataWithAutoscaler(@Nullable Boolean transactionEnabled) throws Exception
+  {
+    final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig(
+            INPUT_FORMAT,
+            getResourceAsString(JSON_INPUT_FORMAT_PATH)
+    );
+    try (
+            final Closeable closer = createResourceCloser(generatedTestConfig);
+            final StreamEventWriter streamEventWriter = createStreamEventWriter(config, transactionEnabled)
+    ) {
+      final String taskSpec = generatedTestConfig.getStreamIngestionPropsTransform()
+              .apply(getResourceAsString(SUPERVISOR_WITH_AUTOSCALER_SPEC_TEMPLATE_PATH));
+      LOG.info("supervisorSpec: [%s]\n", taskSpec);
+      // Start supervisor
+      generatedTestConfig.setSupervisorId(indexer.submitSupervisor(taskSpec));
+      LOG.info("Submitted supervisor");
+      // Start generating half of the data
+      int secondsToGenerateRemaining = TOTAL_NUMBER_OF_SECOND;
+      int secondsToGenerateFirstRound = TOTAL_NUMBER_OF_SECOND / 2;
+      secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateFirstRound;
+      final StreamGenerator streamGenerator = new WikipediaStreamEventStreamGenerator(
+              new JsonEventSerializer(jsonMapper),
+              EVENTS_PER_SECOND,
+              CYCLE_PADDING_MS
+      );
+      long numWritten = streamGenerator.run(
+              generatedTestConfig.getStreamName(),
+              streamEventWriter,
+              secondsToGenerateFirstRound,
+              FIRST_EVENT_TIME
+      );
+      // Verify supervisor is healthy before suspension
+      ITRetryUtil.retryUntil(
+          () -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(generatedTestConfig.getSupervisorId())),
+              true,
+              10000,
+              30,
+              "Waiting for supervisor to be healthy"
+      );
+
+      // wait for autoScaling task numbers from 1 to 2.
+      ITRetryUtil.retryUntil(
+          () -> indexer.getRunningTasks().size() == 2,
+              true,
+              10000,
+              50,
+              "waiting for autoScaling task numbers from 1 to 2"
+      );
+
+      // Start generating remainning half of the data
+      numWritten += streamGenerator.run(
+              generatedTestConfig.getStreamName(),
+              streamEventWriter,
+              secondsToGenerateRemaining,
+              FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound)
+      );
+
+      // Verify that supervisor can catch up with the stream
+      verifyIngestedData(generatedTestConfig, numWritten);
+    }
+  }
+
+
+
   protected void doTestIndexDataWithStreamReshardSplit(@Nullable Boolean transactionEnabled) throws Exception
   {
     // Reshard the stream from STREAM_SHARD_COUNT to STREAM_SHARD_COUNT * 2
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceNonTransactionalParallelizedTest.java b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceNonTransactionalParallelizedTest.java
index 2c648ea..967ff52 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceNonTransactionalParallelizedTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceNonTransactionalParallelizedTest.java
@@ -57,6 +57,16 @@ public class ITKafkaIndexingServiceNonTransactionalParallelizedTest extends Abst
    * and supervisor maintained and scoped within this test only
    */
   @Test
+  public void testKafkaIndexDataWithWithAutoscaler() throws Exception
+  {
+    doTestIndexDataWithAutoscaler(false);
+  }
+
+  /**
+   * This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource
+   * and supervisor maintained and scoped within this test only
+   */
+  @Test
   public void testKafkaIndexDataWithKafkaReshardSplit() throws Exception
   {
     doTestIndexDataWithStreamReshardSplit(false);
diff --git a/integration-tests/src/test/resources/stream/data/supervisor_with_autoscaler_spec_template.json b/integration-tests/src/test/resources/stream/data/supervisor_with_autoscaler_spec_template.json
new file mode 100644
index 0000000..f2fa828
--- /dev/null
+++ b/integration-tests/src/test/resources/stream/data/supervisor_with_autoscaler_spec_template.json
@@ -0,0 +1,73 @@
+{
+  "type": "%%STREAM_TYPE%%",
+  "dataSchema": {
+    "dataSource": "%%DATASOURCE%%",
+    "parser": %%PARSER%%,
+    "timestampSpec": {
+      "column": "timestamp",
+      "format": "auto"
+    },
+    "dimensionsSpec": {
+      "dimensions": ["page", "language", "user", "unpatrolled", "newPage", "robot", "anonymous", "namespace", "continent", "country", "region", "city"],
+      "dimensionExclusions": [],
+      "spatialDimensions": []
+    },
+    "metricsSpec": [
+      {
+        "type": "count",
+        "name": "count"
+      },
+      {
+        "type": "doubleSum",
+        "name": "added",
+        "fieldName": "added"
+      },
+      {
+        "type": "doubleSum",
+        "name": "deleted",
+        "fieldName": "deleted"
+      },
+      {
+        "type": "doubleSum",
+        "name": "delta",
+        "fieldName": "delta"
+      }
+    ],
+    "granularitySpec": {
+      "type": "uniform",
+      "segmentGranularity": "MINUTE",
+      "queryGranularity": "NONE"
+    }
+  },
+  "tuningConfig": {
+    "type": "%%STREAM_TYPE%%",
+    "intermediatePersistPeriod": "PT30S",
+    "maxRowsPerSegment": 5000000,
+    "maxRowsInMemory": 500000
+  },
+  "ioConfig": {
+    "%%TOPIC_KEY%%": "%%TOPIC_VALUE%%",
+    "%%STREAM_PROPERTIES_KEY%%": %%STREAM_PROPERTIES_VALUE%%,
+    "autoScalerConfig": {
+      "enableTaskAutoScaler": true,
+      "lagCollectionIntervalMillis": 500,
+      "lagCollectionRangeMillis": 500,
+      "scaleOutThreshold": 0,
+      "triggerScaleOutFractionThreshold": 0.0,
+      "scaleInThreshold": 1000000,
+      "triggerScaleInFractionThreshold": 0.9,
+      "scaleActionStartDelayMillis": 0,
+      "scaleActionPeriodMillis": 100,
+      "taskCountMax": 2,
+      "taskCountMin": 1,
+      "scaleInStep": 1,
+      "scaleOutStep": 2,
+      "minTriggerScaleActionFrequencyMillis": 600000
+    },
+    "taskCount": 1,
+    "replicas": 1,
+    "taskDuration": "PT30S",
+    "%%USE_EARLIEST_KEY%%": true,
+    "inputFormat" : %%INPUT_FORMAT%%
+  }
+}
diff --git a/pom.xml b/pom.xml
index 24f6993..157aec2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -958,6 +958,11 @@
                 <version>4.5.1</version>
             </dependency>
             <dependency>
+                <groupId>org.apache.commons</groupId>
+                <artifactId>commons-collections4</artifactId>
+                <version>4.2</version>
+            </dependency>
+            <dependency>
                 <groupId>io.dropwizard.metrics</groupId>
                 <artifactId>metrics-core</artifactId>
                 <version>${dropwizard.metrics.version}</version>
diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
index a2aa29f..b19aeaa 100644
--- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
+++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.druid.indexing.overlord.DataSourceMetadata;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
 
 import javax.annotation.Nullable;
 import java.util.ArrayList;
@@ -154,6 +155,18 @@ public class NoopSupervisorSpec implements SupervisorSpec
       {
 
       }
+
+      @Override
+      public LagStats computeLagStats()
+      {
+        return new LagStats(0, 0, 0);
+      }
+
+      @Override
+      public int getActiveTaskGroupsCount()
+      {
+        return -1;
+      }
     };
   }
 
diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
index 83c661a..66d1139 100644
--- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
+++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
@@ -21,6 +21,7 @@ package org.apache.druid.indexing.overlord.supervisor;
 
 import com.google.common.collect.ImmutableMap;
 import org.apache.druid.indexing.overlord.DataSourceMetadata;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
 
 import javax.annotation.Nullable;
 import java.util.Map;
@@ -64,4 +65,12 @@ public interface Supervisor
    * @param checkpointMetadata metadata for the sequence to currently checkpoint
    */
   void checkpoint(int taskGroupId, DataSourceMetadata checkpointMetadata);
+
+  /**
+   * Computes maxLag, totalLag and avgLag
+   * Only supports Kafka ingestion so far.
+   */
+  LagStats computeLagStats();
+
+  int getActiveTaskGroupsCount();
 }
diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java
index 041156c..9b44cd0 100644
--- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java
+++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java
@@ -21,6 +21,7 @@ package org.apache.druid.indexing.overlord.supervisor;
 
 import com.fasterxml.jackson.annotation.JsonSubTypes;
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
 
 import java.util.List;
 
@@ -40,6 +41,11 @@ public interface SupervisorSpec
    */
   Supervisor createSupervisor();
 
+  default SupervisorTaskAutoScaler createAutoscaler(Supervisor supervisor)
+  {
+    return null;
+  }
+
   List<String> getDataSources();
 
   default SupervisorSpec createSuspendedSpec()
diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagStats.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagStats.java
new file mode 100644
index 0000000..7b6e5fd
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagStats.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord.supervisor.autoscaler;
+
+public class LagStats
+{
+  private final long maxLag;
+  private final long totalLag;
+  private final long avgLag;
+
+  public LagStats(long maxLag, long totalLag, long avgLag)
+  {
+    this.maxLag = maxLag;
+    this.totalLag = totalLag;
+    this.avgLag = avgLag;
+  }
+
+  public long getMaxLag()
+  {
+    return maxLag;
+  }
+
+  public long getTotalLag()
+  {
+    return totalLag;
+  }
+
+  public long getAvgLag()
+  {
+    return avgLag;
+  }
+}
diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/SupervisorTaskAutoScaler.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/SupervisorTaskAutoScaler.java
new file mode 100644
index 0000000..c921e27
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/SupervisorTaskAutoScaler.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord.supervisor.autoscaler;
+
+public interface SupervisorTaskAutoScaler
+{
+  void start();
+  void stop();
+  void reset();
+}
diff --git a/server/src/test/java/org/apache/druid/indexing/NoopSupervisorSpecTest.java b/server/src/test/java/org/apache/druid/indexing/NoopSupervisorSpecTest.java
new file mode 100644
index 0000000..fd5fac0
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/indexing/NoopSupervisorSpecTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing;
+
+import org.apache.druid.indexing.overlord.supervisor.NoopSupervisorSpec;
+import org.apache.druid.indexing.overlord.supervisor.Supervisor;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.concurrent.Callable;
+
+public class NoopSupervisorSpecTest
+{
+  @Test
+  public void testNoopSupervisorSpecWithAutoscaler()
+  {
+    Exception e = null;
+    try {
+      NoopSupervisorSpec noopSupervisorSpec = new NoopSupervisorSpec(null, Collections.singletonList("datasource1"));
+      Supervisor supervisor = noopSupervisorSpec.createSupervisor();
+      SupervisorTaskAutoScaler autoscaler = noopSupervisorSpec.createAutoscaler(supervisor);
+      Assert.assertNull(autoscaler);
+      Callable<Integer> noop = new Callable<Integer>() {
+          @Override
+          public Integer call()
+          {
+            return -1;
+          }
+      };
+
+      int count = supervisor.getActiveTaskGroupsCount();
+      Assert.assertEquals(count, -1);
+
+      LagStats lagStats = supervisor.computeLagStats();
+      long totalLag = lagStats.getTotalLag();
+      long avgLag = lagStats.getAvgLag();
+      long maxLag = lagStats.getMaxLag();
+      Assert.assertEquals(totalLag, 0);
+      Assert.assertEquals(avgLag, 0);
+      Assert.assertEquals(maxLag, 0);
+    }
+    catch (Exception ex) {
+      e = ex;
+    }
+    Assert.assertNull(e);
+  }
+}
diff --git a/server/src/test/java/org/apache/druid/metadata/SQLMetadataSupervisorManagerTest.java b/server/src/test/java/org/apache/druid/metadata/SQLMetadataSupervisorManagerTest.java
index 9332220..5c40757 100644
--- a/server/src/test/java/org/apache/druid/metadata/SQLMetadataSupervisorManagerTest.java
+++ b/server/src/test/java/org/apache/druid/metadata/SQLMetadataSupervisorManagerTest.java
@@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableMap;
 import org.apache.druid.indexing.overlord.supervisor.Supervisor;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
 import org.apache.druid.indexing.overlord.supervisor.VersionedSupervisorSpec;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.StringUtils;
 import org.junit.After;
@@ -186,6 +187,12 @@ public class SQLMetadataSupervisorManagerTest
     }
 
     @Override
+    public SupervisorTaskAutoScaler createAutoscaler(Supervisor supervisor)
+    {
+      return null;
+    }
+
+    @Override
     public List<String> getDataSources()
     {
       return Collections.singletonList(dataSource);
diff --git a/server/src/test/java/org/apache/druid/metadata/TestSupervisorSpec.java b/server/src/test/java/org/apache/druid/metadata/TestSupervisorSpec.java
index e935a41..ffacfa2 100644
--- a/server/src/test/java/org/apache/druid/metadata/TestSupervisorSpec.java
+++ b/server/src/test/java/org/apache/druid/metadata/TestSupervisorSpec.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.druid.indexing.overlord.supervisor.Supervisor;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
 
 import java.util.List;
 import java.util.Objects;
@@ -53,6 +54,12 @@ public class TestSupervisorSpec implements SupervisorSpec
   }
 
   @Override
+  public SupervisorTaskAutoScaler createAutoscaler(Supervisor supervisor)
+  {
+    return null;
+  }
+
+  @Override
   public List<String> getDataSources()
   {
     return null;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org