You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ji...@apache.org on 2020/04/24 19:58:21 UTC

[druid] branch 0.18.1 updated: [Backport] Integration tests for Kafka & Kinesis ingestion (#9745)

This is an automated email from the ASF dual-hosted git repository.

jihoonson pushed a commit to branch 0.18.1
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/0.18.1 by this push:
     new 1a0071e  [Backport] Integration tests for Kafka & Kinesis ingestion (#9745)
1a0071e is described below

commit 1a0071e64ea2e05b8eafe3bb2c4099f28d020568
Author: Maytas Monsereenusorn <52...@users.noreply.github.com>
AuthorDate: Fri Apr 24 09:58:09 2020 -1000

    [Backport] Integration tests for Kafka & Kinesis ingestion (#9745)
    
    * backport Add Integration Test for functionality of kinesis ingestion (#9576)
    
    * backport Add integration tests for kafka ingestion (#9724)
    
    * resolve merge conflict
    
    * integration test cluster prop change to support parallel
---
 .travis.yml                                        |  33 +-
 .../extensions-core/kinesis-ingestion.md           |   2 +-
 integration-tests/README.md                        |  56 ++-
 .../docker/environment-configs/broker              |   2 +-
 .../docker/environment-configs/coordinator         |   2 +-
 .../docker/environment-configs/historical          |   2 +-
 .../docker/environment-configs/middlemanager       |   3 +-
 .../docker/environment-configs/overlord            |   2 +-
 .../{router => override-examples/kinesis}          |  15 +-
 .../docker/environment-configs/router              |   2 +-
 .../environment-configs/router-custom-check-tls    |   2 +-
 .../environment-configs/router-no-client-auth-tls  |   2 +-
 .../environment-configs/router-permissive-tls      |   2 +-
 .../tls/generate-server-certs-and-keystores.sh     |   6 +
 integration-tests/pom.xml                          | 102 +++--
 integration-tests/run_cluster.sh                   |  29 +-
 .../druid/testing/ConfigFileConfigProvider.java    |   8 +
 .../apache/druid/testing/DockerConfigProvider.java |   9 +
 .../druid/testing/IntegrationTestingConfig.java    |   2 +
 .../clients/OverlordResourceTestClient.java        | 105 +++++
 .../testing/guice/DruidTestModuleFactory.java      |   1 -
 .../testing/utils/DruidClusterAdminClient.java     | 162 ++++++++
 .../druid/testing/utils/KafkaAdminClient.java      | 108 +++++
 .../druid/testing/utils/KafkaEventWriter.java      | 123 ++++++
 .../druid/testing/utils/KinesisAdminClient.java    | 178 ++++++++
 .../druid/testing/utils/KinesisEventWriter.java    | 113 ++++++
 .../druid/testing/utils/StreamAdminClient.java     |  44 ++
 .../druid/testing/utils/StreamEventWriter.java     |  40 ++
 .../druid/testing/utils/StreamGenerator.java       |  31 ++
 .../utils/StreamVerifierEventGenerator.java        |  55 +++
 .../utils/StreamVerifierSyntheticEvent.java        | 104 +++++
 .../apache/druid/testing/utils/SuiteListener.java  |  65 +++
 .../testing/utils/SyntheticStreamGenerator.java    | 167 ++++++++
 .../utils/WikipediaStreamEventStreamGenerator.java |  60 +++
 .../java/org/testng/DruidTestRunnerFactory.java    | 141 -------
 .../java/org/apache/druid/tests/TestNGGroup.java   |  24 +-
 .../tests/indexer/AbstractKafkaIndexerTest.java    | 313 --------------
 .../indexer/AbstractKafkaIndexingServiceTest.java  | 159 ++++++++
 .../AbstractKinesisIndexingServiceTest.java        | 150 +++++++
 .../tests/indexer/AbstractStreamIndexingTest.java  | 450 +++++++++++++++++++++
 ...exingServiceNonTransactionalSerializedTest.java |  83 ++++
 .../tests/indexer/ITKafkaIndexingServiceTest.java  |  63 ---
 ...IndexingServiceTransactionalSerializedTest.java |  83 ++++
 .../ITKafkaIndexingServiceTransactionalTest.java   |  66 ---
 .../ITKinesisIndexingServiceSerializedTest.java    |  77 ++++
 ...ingServiceNonTransactionalParallelizedTest.java |  97 +++++
 ...dexingServiceTransactionalParallelizedTest.java |  97 +++++
 .../ITKinesisIndexingServiceParallelizedTest.java  | 101 +++++
 ...ndex_queries.json => stream_index_queries.json} |   0
 ...on => stream_supervisor_spec_input_format.json} |  12 +-
 ...n => stream_supervisor_spec_legacy_parser.json} |  12 +-
 integration-tests/src/test/resources/testng.xml    |  27 +-
 integration-tests/stop_cluster.sh                  |   6 +
 pom.xml                                            |   3 +-
 54 files changed, 2915 insertions(+), 686 deletions(-)

diff --git a/.travis.yml b/.travis.yml
index 4b5b2d3..e4cfc15 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -316,6 +316,30 @@ jobs:
       script: *run_integration_test
       after_failure: *integration_test_diags
 
+    - &integration_kafka_index_slow
+      name: "(Compile=openjdk8, Run=openjdk8) kafka index integration test slow"
+      jdk: openjdk8
+      services: *integration_test_services
+      env: TESTNG_GROUPS='-Dgroups=kafka-index-slow' JVM_RUNTIME='-Djvm.runtime=8'
+      script: *run_integration_test
+      after_failure: *integration_test_diags
+
+    - &integration_kafka_transactional_index
+      name: "(Compile=openjdk8, Run=openjdk8) transactional kafka index integration test"
+      jdk: openjdk8
+      services: *integration_test_services
+      env: TESTNG_GROUPS='-Dgroups=kafka-transactional-index' JVM_RUNTIME='-Djvm.runtime=8'
+      script: *run_integration_test
+      after_failure: *integration_test_diags
+
+    - &integration_kafka_transactional_index_slow
+      name: "(Compile=openjdk8, Run=openjdk8) transactional kafka index integration test slow"
+      jdk: openjdk8
+      services: *integration_test_services
+      env: TESTNG_GROUPS='-Dgroups=kafka-transactional-index-slow' JVM_RUNTIME='-Djvm.runtime=8'
+      script: *run_integration_test
+      after_failure: *integration_test_diags
+
     - &integration_query
       name: "(Compile=openjdk8, Run=openjdk8) query integration test"
       jdk: openjdk8
@@ -344,7 +368,7 @@ jobs:
       name: "(Compile=openjdk8, Run=openjdk8) other integration test"
       jdk: openjdk8
       services: *integration_test_services
-      env: TESTNG_GROUPS='-DexcludedGroups=batch-index,perfect-rollup-parallel-batch-index,kafka-index,query,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion' JVM_RUNTIME='-Djvm.runtime=8'
+      env: TESTNG_GROUPS='-DexcludedGroups=batch-index,perfect-rollup-parallel-batch-index,kafka-index,query,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow' JVM_RUNTIME='-Djvm.runtime=8'
       script: *run_integration_test
       after_failure: *integration_test_diags
     # END - Integration tests for Compile with Java 8 and Run with Java 8
@@ -360,11 +384,6 @@ jobs:
       jdk: openjdk8
       env: TESTNG_GROUPS='-Dgroups=perfect-rollup-parallel-batch-index' JVM_RUNTIME='-Djvm.runtime=11'
 
-    - <<: *integration_kafka_index
-      name: "(Compile=openjdk8, Run=openjdk11) kafka index integration test"
-      jdk: openjdk8
-      env: TESTNG_GROUPS='-Dgroups=kafka-index' JVM_RUNTIME='-Djvm.runtime=11'
-
     - <<: *integration_query
       name: "(Compile=openjdk8, Run=openjdk11) query integration test"
       jdk: openjdk8
@@ -383,7 +402,7 @@ jobs:
     - <<: *integration_tests
       name: "(Compile=openjdk8, Run=openjdk11) other integration test"
       jdk: openjdk8
-      env: TESTNG_GROUPS='-DexcludedGroups=batch-index,perfect-rollup-parallel-batch-index,kafka-index,query,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion' JVM_RUNTIME='-Djvm.runtime=11'
+      env: TESTNG_GROUPS='-DexcludedGroups=batch-index,perfect-rollup-parallel-batch-index,kafka-index,query,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow' JVM_RUNTIME='-Djvm.runtime=11'
     # END - Integration tests for Compile with Java 8 and Run with Java 11
 
     - name: "security vulnerabilities"
diff --git a/docs/development/extensions-core/kinesis-ingestion.md b/docs/development/extensions-core/kinesis-ingestion.md
index bb54e4b..e0fcc2d 100644
--- a/docs/development/extensions-core/kinesis-ingestion.md
+++ b/docs/development/extensions-core/kinesis-ingestion.md
@@ -137,7 +137,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
 | `indexSpecForIntermediatePersists`    |                | Defines segment storage format options to be used at indexing time for intermediate persisted temporary segments. This can be used to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. However, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published, see [IndexSpec](#indexspec) for po [...]
 | `reportParseExceptions`               | Boolean        | If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped.                                                                                                                                                                                                                                                                                                     [...]
 | `handoffConditionTimeout`             | Long           | Milliseconds to wait for segment handoff. It must be >= 0, where 0 means to wait forever.                                                                                                                                                                                                                                                                                                                                                         [...]
-| `resetOffsetAutomatically`            | Boolean        | Controls behavior when Druid needs to read Kinesis messages that are no longer available.<br/><br/>If false, the exception will bubble up, which will cause your tasks to fail and ingestion to halt. If this occurs, manual intervention is required to correct the situation; potentially using the [Reset Supervisor API](../../operations/api-reference.html#supervisors). This mode is useful for production, since it will make you aware o [...]
+| `resetOffsetAutomatically`            | Boolean        | Controls behavior when Druid needs to read Kinesis messages that are no longer available.<br/><br/>If false, the exception will bubble up, which will cause your tasks to fail and ingestion to halt. If this occurs, manual intervention is required to correct the situation; potentially using the [Reset Supervisor API](../../operations/api-reference.html#supervisors). This mode is useful for production, since it will make you aware o [...]
 | `skipSequenceNumberAvailabilityCheck` | Boolean        | Whether to enable checking if the current sequence number is still available in a particular Kinesis shard. If set to false, the indexing task will attempt to reset the current sequence number (or not), depending on the value of `resetOffsetAutomatically`.                                                                                                                                                                                  [...]
 | `workerThreads`                       | Integer        | The number of threads that will be used by the supervisor for asynchronous operations.                                                                                                                                                                                                                                                                                                                                                            [...]
 | `chatThreads`                         | Integer        | The number of threads that will be used for communicating with indexing tasks.                                                                                                                                                                                                                                                                                                                                                                    [...]
diff --git a/integration-tests/README.md b/integration-tests/README.md
index 03c6851..d936535 100644
--- a/integration-tests/README.md
+++ b/integration-tests/README.md
@@ -68,6 +68,32 @@ can either be 8 or 11.
 Druid's configuration (using Docker) can be overrided by providing -Doverride.config.path=<PATH_TO_FILE>. 
 The file must contain one property per line, the key must start with `druid_` and the format should be snake case. 
 
+## Tips & tricks for debugging and developing integration tests
+
+### Useful mvn command flags
+
+- -Dskip.start.docker=true to skip starting docker containers. This can save ~3 minutes by skipping building and bringing 
+up the docker containers (Druid, Kafka, Hadoop, MYSQL, zookeeper, etc). Please make sure that you actually do have
+these containers already running if using this flag. Additionally, please make sure that the running containers
+are in the same state that the setup script (run_cluster.sh) would have brought it up in. 
+- -Dskip.stop.docker=true to skip stopping and teardowning down the docker containers. This can be useful in further
+debugging after the integration tests have finish running. 
+
+### Debugging Druid while running tests
+
+For your convenience, Druid processes running inside Docker have debugging enabled and the following ports have 
+been made available to attach your remote debugger (such as via IntelliJ IDEA's Remote Configuration):
+
+- Overlord process at port 5009
+- Middlemanager process at port 5008
+- Historical process at port 5007
+- Coordinator process at port 5006
+- Broker process at port 5005
+- Router process at port 5004
+- Router with custom check tls process at port 5003
+- Router with no client auth tls process at port 5002
+- Router with permissive tls process at port 5001
+
 Running Tests Using A Quickstart Cluster
 -------------------
 
@@ -145,20 +171,26 @@ The integration test that indexes from Cloud or uses Cloud as deep storage is no
 of the integration test run discussed above. Running these tests requires the user to provide
 their own Cloud. 
 
-Currently, the integration test supports Google Cloud Storage, Amazon S3, and Microsoft Azure.
-These can be run by providing "gcs-deep-storage", "s3-deep-storage", or "azure-deep-storage" 
-to -Dgroups for Google Cloud Storage, Amazon S3, and Microsoft Azure respectively. Note that only
+Currently, the integration test supports Amazon Kinesis, Google Cloud Storage, Amazon S3, and Microsoft Azure.
+These can be run by providing "kinesis-index", "gcs-deep-storage", "s3-deep-storage", or "azure-deep-storage" 
+to -Dgroups for Amazon Kinesis, Google Cloud Storage, Amazon S3, and Microsoft Azure respectively. Note that only
 one group should be run per mvn command.
 
-In addition to specifying the -Dgroups to mvn command, the following will need to be provided:
+For all of the Cloud Integration tests, the following will also need to be provided:
+1) Provide -Doverride.config.path=<PATH_TO_FILE> with your Cloud credentials/configs set. See
+integration-tests/docker/environment-configs/override-examples/ directory for env vars to provide for each Cloud.
+
+For Amazon Kinesis, the following will also need to be provided:
+1) Provide -Ddruid.test.config.streamEndpoint=<STREAM_ENDPOINT> with the endpoint of your stream set. 
+For example, kinesis.us-east-1.amazonaws.com
+
+For Google Cloud Storage, Amazon S3, and Microsoft Azure, the following will also need to be provided:
 1) Set the bucket and path for your test data. This can be done by setting -Ddruid.test.config.cloudBucket and 
 -Ddruid.test.config.cloudPath in the mvn command or setting "cloud_bucket" and "cloud_path" in the config file.
 2) Copy wikipedia_index_data1.json, wikipedia_index_data2.json, and wikipedia_index_data3.json 
 located in integration-tests/src/test/resources/data/batch_index to your Cloud storage at the location set in step 1.
-3) Provide -Doverride.config.path=<PATH_TO_FILE> with your Cloud credentials/configs set. See
-integration-tests/docker/environment-configs/override-examples/ directory for env vars to provide for each Cloud storage.
 
-For running Google Cloud Storage, in addition to the above, you will also have to:
+For Google Cloud Storage, in addition to the above, you will also have to:
 1) Provide -Dresource.file.dir.path=<PATH_TO_FOLDER> with folder that contains GOOGLE_APPLICATION_CREDENTIALS file
 
 For example, to run integration test for Google Cloud Storage:
@@ -275,3 +307,13 @@ This will tell the test framework that the test class needs to be constructed us
 2) FromFileTestQueryHelper - reads queries with expected results from file and executes them and verifies the results using ResultVerifier
 
 Refer ITIndexerTest as an example on how to use dependency Injection
+
+### Running test methods in parallel
+By default, test methods in a test class will be run in sequential order one at a time. Test methods for a given test 
+class can be set to run in parallel (multiple test methods of each class running at the same time) by excluding
+the given class/package from the "AllSerializedTests" test tag section and including it in the "AllParallelizedTests" 
+test tag section in integration-tests/src/test/resources/testng.xml  
+Please be mindful when adding tests to the "AllParallelizedTests" test tag that the tests can run in parallel with
+other tests from the same class at the same time. i.e. test does not modify/restart/stop the druid cluster or other dependency containers,
+test does not use excessive memory starving other concurent task, test does not modify and/or use other task, 
+supervisor, datasource it did not create. 
diff --git a/integration-tests/docker/environment-configs/broker b/integration-tests/docker/environment-configs/broker
index b8794d4..ae2d561 100644
--- a/integration-tests/docker/environment-configs/broker
+++ b/integration-tests/docker/environment-configs/broker
@@ -21,7 +21,7 @@ DRUID_SERVICE=broker
 DRUID_LOG_PATH=/shared/logs/broker.log
 
 # JAVA OPTS
-SERVICE_DRUID_JAVA_OPTS=-server -Xmx512m -Xms512m -XX:NewSize=256m -XX:MaxNewSize=256m -XX:+UseG1GC
+SERVICE_DRUID_JAVA_OPTS=-server -Xmx512m -Xms512m -XX:NewSize=256m -XX:MaxNewSize=256m -XX:+UseG1GC -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005
 
 # Druid configs
 druid_processing_buffer_sizeBytes=25000000
diff --git a/integration-tests/docker/environment-configs/coordinator b/integration-tests/docker/environment-configs/coordinator
index de779f6..6bd0260 100644
--- a/integration-tests/docker/environment-configs/coordinator
+++ b/integration-tests/docker/environment-configs/coordinator
@@ -21,7 +21,7 @@ DRUID_SERVICE=coordinator
 DRUID_LOG_PATH=/shared/logs/coordinator.log
 
 # JAVA OPTS
-SERVICE_DRUID_JAVA_OPTS=-server -Xmx128m -Xms128m -XX:+UseG1GC
+SERVICE_DRUID_JAVA_OPTS=-server -Xmx128m -Xms128m -XX:+UseG1GC -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5006
 
 # Druid configs
 druid_metadata_storage_type=mysql
diff --git a/integration-tests/docker/environment-configs/historical b/integration-tests/docker/environment-configs/historical
index 1f74b0c..a2fcf33 100644
--- a/integration-tests/docker/environment-configs/historical
+++ b/integration-tests/docker/environment-configs/historical
@@ -21,7 +21,7 @@ DRUID_SERVICE=historical
 DRUID_LOG_PATH=/shared/logs/historical.log
 
 # JAVA OPTS
-SERVICE_DRUID_JAVA_OPTS=-server -Xmx512m -Xms512m -XX:NewSize=256m -XX:MaxNewSize=256m -XX:+UseG1GC
+SERVICE_DRUID_JAVA_OPTS=-server -Xmx512m -Xms512m -XX:NewSize=256m -XX:MaxNewSize=256m -XX:+UseG1GC -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5007
 
 # Druid configs
 druid_processing_buffer_sizeBytes=25000000
diff --git a/integration-tests/docker/environment-configs/middlemanager b/integration-tests/docker/environment-configs/middlemanager
index c37c3fe..0ca4dbc 100644
--- a/integration-tests/docker/environment-configs/middlemanager
+++ b/integration-tests/docker/environment-configs/middlemanager
@@ -21,7 +21,7 @@ DRUID_SERVICE=middleManager
 DRUID_LOG_PATH=/shared/logs/middlemanager.log
 
 # JAVA OPTS
-SERVICE_DRUID_JAVA_OPTS=-server -Xmx64m -Xms64m -XX:+UseG1GC
+SERVICE_DRUID_JAVA_OPTS=-server -Xmx64m -Xms64m -XX:+UseG1GC -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5008
 
 # Druid configs
 druid_server_http_numThreads=100
@@ -37,3 +37,4 @@ druid_indexer_task_chathandler_type=announce
 druid_auth_basic_common_cacheDirectory=/tmp/authCache/middleManager
 druid_startup_logging_logProperties=true
 druid_server_https_crlPath=/tls/revocations.crl
+druid_worker_capacity=20
\ No newline at end of file
diff --git a/integration-tests/docker/environment-configs/overlord b/integration-tests/docker/environment-configs/overlord
index d86eb19..ebb3d5b 100644
--- a/integration-tests/docker/environment-configs/overlord
+++ b/integration-tests/docker/environment-configs/overlord
@@ -21,7 +21,7 @@ DRUID_SERVICE=overlord
 DRUID_LOG_PATH=/shared/logs/overlord.log
 
 # JAVA OPTS
-SERVICE_DRUID_JAVA_OPTS=-server -Xmx128m -Xms128m -XX:+UseG1GC
+SERVICE_DRUID_JAVA_OPTS=-server -Xmx128m -Xms128m -XX:+UseG1GC -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5009
 
 # Druid configs
 druid_metadata_storage_type=mysql
diff --git a/integration-tests/docker/environment-configs/router b/integration-tests/docker/environment-configs/override-examples/kinesis
similarity index 73%
copy from integration-tests/docker/environment-configs/router
copy to integration-tests/docker/environment-configs/override-examples/kinesis
index b3636b7..33d4bac 100644
--- a/integration-tests/docker/environment-configs/router
+++ b/integration-tests/docker/environment-configs/override-examples/kinesis
@@ -16,14 +16,7 @@
 # specific language governing permissions and limitations
 # under the License.
 #
-
-DRUID_SERVICE=router
-DRUID_LOG_PATH=/shared/logs/router.log
-
-# JAVA OPTS
-SERVICE_DRUID_JAVA_OPTS=-server -Xmx128m -XX:+UseG1GC
-
-# Druid configs
-druid_auth_basic_common_cacheDirectory=/tmp/authCache/router
-druid_sql_avatica_enable=true
-druid_server_https_crlPath=/tls/revocations.crl
+druid_kinesis_accessKey=<OVERRIDE_THIS>
+druid_kinesis_secretKey=<OVERRIDE_THIS>
+AWS_REGION=<OVERRIDE_THIS>
+druid_extensions_loadList=["druid-kinesis-indexing-service"]
\ No newline at end of file
diff --git a/integration-tests/docker/environment-configs/router b/integration-tests/docker/environment-configs/router
index b3636b7..f25b23e 100644
--- a/integration-tests/docker/environment-configs/router
+++ b/integration-tests/docker/environment-configs/router
@@ -21,7 +21,7 @@ DRUID_SERVICE=router
 DRUID_LOG_PATH=/shared/logs/router.log
 
 # JAVA OPTS
-SERVICE_DRUID_JAVA_OPTS=-server -Xmx128m -XX:+UseG1GC
+SERVICE_DRUID_JAVA_OPTS=-server -Xmx128m -XX:+UseG1GC -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5004
 
 # Druid configs
 druid_auth_basic_common_cacheDirectory=/tmp/authCache/router
diff --git a/integration-tests/docker/environment-configs/router-custom-check-tls b/integration-tests/docker/environment-configs/router-custom-check-tls
index 07b0724..ece8531 100644
--- a/integration-tests/docker/environment-configs/router-custom-check-tls
+++ b/integration-tests/docker/environment-configs/router-custom-check-tls
@@ -21,7 +21,7 @@ DRUID_SERVICE=router
 DRUID_LOG_PATH=/shared/logs/router-custom-check-tls.log
 
 # JAVA OPTS
-SERVICE_DRUID_JAVA_OPTS=-server -Xmx128m -XX:+UseConcMarkSweepGC -XX:+PrintGCDetails
+SERVICE_DRUID_JAVA_OPTS=-server -Xmx128m -XX:+UseConcMarkSweepGC -XX:+PrintGCDetails -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5003
 
 # Druid configs
 druid_plaintextPort=8891
diff --git a/integration-tests/docker/environment-configs/router-no-client-auth-tls b/integration-tests/docker/environment-configs/router-no-client-auth-tls
index bc6959c..4b703ba 100644
--- a/integration-tests/docker/environment-configs/router-no-client-auth-tls
+++ b/integration-tests/docker/environment-configs/router-no-client-auth-tls
@@ -21,7 +21,7 @@ DRUID_SERVICE=router
 DRUID_LOG_PATH=/shared/logs/router-no-client-auth-tls.log
 
 # JAVA OPTS
-SERVICE_DRUID_JAVA_OPTS=-server -Xmx128m -XX:+UseConcMarkSweepGC -XX:+PrintGCDetails
+SERVICE_DRUID_JAVA_OPTS=-server -Xmx128m -XX:+UseConcMarkSweepGC -XX:+PrintGCDetails -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5002
 
 # Druid configs
 druid_plaintextPort=8890
diff --git a/integration-tests/docker/environment-configs/router-permissive-tls b/integration-tests/docker/environment-configs/router-permissive-tls
index b4beb9f..41346cb 100644
--- a/integration-tests/docker/environment-configs/router-permissive-tls
+++ b/integration-tests/docker/environment-configs/router-permissive-tls
@@ -21,7 +21,7 @@ DRUID_SERVICE=router
 DRUID_LOG_PATH=/shared/logs/router-permissive-tls.log
 
 # JAVA OPTS
-SERVICE_DRUID_JAVA_OPTS=-server -Xmx128m -XX:+UseConcMarkSweepGC -XX:+PrintGCDetails
+SERVICE_DRUID_JAVA_OPTS=-server -Xmx128m -XX:+UseConcMarkSweepGC -XX:+PrintGCDetails -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5001
 
 # Druid configs
 druid_plaintextPort=8889
diff --git a/integration-tests/docker/tls/generate-server-certs-and-keystores.sh b/integration-tests/docker/tls/generate-server-certs-and-keystores.sh
index e26cdac..28bf09a 100755
--- a/integration-tests/docker/tls/generate-server-certs-and-keystores.sh
+++ b/integration-tests/docker/tls/generate-server-certs-and-keystores.sh
@@ -17,6 +17,12 @@
 
 cd /tls
 
+FILE_CHECK_IF_RAN=/tls/server.key
+if [ -f "$FILE_CHECK_IF_RAN" ]; then
+  echo "Using existing certs/keys since /tls/server.key exists. Skipping generation (most likely this script was ran previously). To generate new certs, delete /tls/server.key"
+  exit
+fi
+
 rm -f cert_db.txt
 touch cert_db.txt
 
diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml
index cc41c03..5971a34 100644
--- a/integration-tests/pom.xml
+++ b/integration-tests/pom.xml
@@ -33,6 +33,32 @@
 
     <dependencies>
         <dependency>
+            <groupId>com.amazonaws</groupId>
+            <artifactId>amazon-kinesis-producer</artifactId>
+            <version>0.13.1</version>
+        </dependency>
+        <dependency>
+            <groupId>com.amazonaws</groupId>
+            <artifactId>aws-java-sdk-kinesis</artifactId>
+            <version>${aws.sdk.version}</version>
+            <exclusions>
+                <!-- aws-java-sdk-core is provided by Druid core. -->
+                <exclusion>
+                    <groupId>com.amazonaws</groupId>
+                    <artifactId>aws-java-sdk-core</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>com.amazonaws</groupId>
+            <artifactId>aws-java-sdk-core</artifactId>
+            <version>${aws.sdk.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>commons-codec</groupId>
+            <artifactId>commons-codec</artifactId>
+        </dependency>
+        <dependency>
             <groupId>org.apache.druid</groupId>
             <artifactId>druid-core</artifactId>
             <version>${project.parent.version}</version>
@@ -51,6 +77,12 @@
         </dependency>
         <dependency>
             <groupId>org.apache.druid.extensions</groupId>
+            <artifactId>druid-kinesis-indexing-service</artifactId>
+            <version>${project.parent.version}</version>
+            <scope>runtime</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.druid.extensions</groupId>
             <artifactId>druid-azure-extensions</artifactId>
             <version>${project.parent.version}</version>
             <scope>runtime</scope>
@@ -190,11 +222,6 @@
             <artifactId>guava</artifactId>
         </dependency>
         <dependency>
-            <groupId>com.101tec</groupId>
-            <artifactId>zkclient</artifactId>
-            <version>0.10</version>
-        </dependency>
-        <dependency>
             <groupId>javax.validation</groupId>
             <artifactId>validation-api</artifactId>
         </dependency>
@@ -207,6 +234,38 @@
             <groupId>com.google.code.findbugs</groupId>
             <artifactId>jsr305</artifactId>
         </dependency>
+        <dependency>
+            <groupId>com.github.docker-java</groupId>
+            <artifactId>docker-java</artifactId>
+            <version>3.2.0</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.github.docker-java</groupId>
+                    <artifactId>docker-java-transport-jersey</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>io.netty</groupId>
+                    <artifactId>netty-transport-native-kqueue</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>com.github.docker-java</groupId>
+            <artifactId>docker-java-transport-netty</artifactId>
+            <version>3.2.0</version>
+        </dependency>
+        <dependency>
+            <groupId>com.github.docker-java</groupId>
+            <artifactId>docker-java-api</artifactId>
+            <version>3.2.0</version>
+        </dependency>
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-transport-native-kqueue</artifactId>
+            <version>${netty4.version}</version>
+            <classifier>osx-x86_64</classifier>
+            <scope>runtime</scope>
+        </dependency>
 
         <!-- Tests -->
         <dependency>
@@ -228,21 +287,6 @@
             <artifactId>easymock</artifactId>
             <scope>test</scope>
         </dependency>
-        <dependency>
-            <groupId>org.apache.kafka</groupId>
-            <artifactId>kafka_2.12</artifactId>
-            <version>${apache.kafka.version}</version>
-            <exclusions>
-                <exclusion>
-                    <artifactId>log4j</artifactId>
-                    <groupId>log4j</groupId>
-                </exclusion>
-                <exclusion>
-                    <artifactId>slf4j-log4j12</artifactId>
-                    <groupId>org.slf4j</groupId>
-                </exclusion>
-            </exclusions>
-        </dependency>
     </dependencies>
 
     <build>
@@ -293,6 +337,8 @@
                 <start.hadoop.docker>false</start.hadoop.docker>
                 <override.config.path />
                 <resource.file.dir.path />
+                <skip.start.docker>false</skip.start.docker>
+                <skip.stop.docker>false</skip.stop.docker>
             </properties>
             <build>
                 <plugins>
@@ -309,6 +355,7 @@
                                 <configuration>
                                     <environmentVariables>
                                     <DRUID_INTEGRATION_TEST_START_HADOOP_DOCKER>${start.hadoop.docker}</DRUID_INTEGRATION_TEST_START_HADOOP_DOCKER>
+                                    <DRUID_INTEGRATION_TEST_SKIP_START_DOCKER>${skip.start.docker}</DRUID_INTEGRATION_TEST_SKIP_START_DOCKER>
                                     <DRUID_INTEGRATION_TEST_JVM_RUNTIME>${jvm.runtime}</DRUID_INTEGRATION_TEST_JVM_RUNTIME>
                                     <DRUID_INTEGRATION_TEST_GROUP>${groups}</DRUID_INTEGRATION_TEST_GROUP>
                                     <DRUID_INTEGRATION_TEST_OVERRIDE_CONFIG_PATH>${override.config.path}</DRUID_INTEGRATION_TEST_OVERRIDE_CONFIG_PATH>
@@ -324,6 +371,9 @@
                                 </goals>
                                 <phase>post-integration-test</phase>
                                 <configuration>
+                                    <environmentVariables>
+                                        <DRUID_INTEGRATION_TEST_SKIP_STOP_DOCKER>${skip.stop.docker}</DRUID_INTEGRATION_TEST_SKIP_STOP_DOCKER>
+                                    </environmentVariables>
                                     <executable>${project.basedir}/stop_cluster.sh</executable>
                                 </configuration>
                             </execution>
@@ -343,12 +393,6 @@
                             </execution>
                         </executions>
                         <configuration>
-                            <properties>
-                                <property>
-                                    <name>testrunfactory</name>
-                                    <value>org.testng.DruidTestRunnerFactory</value>
-                                </property>
-                            </properties>
                             <argLine>
                                 -Duser.timezone=UTC
                                 -Dfile.encoding=UTF-8
@@ -401,12 +445,6 @@
                             </execution>
                         </executions>
                         <configuration>
-                            <properties>
-                                <property>
-                                    <name>testrunfactory</name>
-                                    <value>org.testng.DruidTestRunnerFactory</value>
-                                </property>
-                            </properties>
                             <argLine>
                                 -Duser.timezone=UTC
                                 -Dfile.encoding=UTF-8
diff --git a/integration-tests/run_cluster.sh b/integration-tests/run_cluster.sh
index 87fd854..1ee03b2 100755
--- a/integration-tests/run_cluster.sh
+++ b/integration-tests/run_cluster.sh
@@ -14,6 +14,12 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+# Skip starting docker if flag set (For use during development)
+if [ -n "$DRUID_INTEGRATION_TEST_SKIP_START_DOCKER" ] && [ "$DRUID_INTEGRATION_TEST_SKIP_START_DOCKER" == true ]
+  then
+    exit 0
+  fi
+
 # Cleanup old images/containers
 {
   for node in druid-historical druid-coordinator druid-overlord druid-router druid-router-permissive-tls druid-router-no-client-auth-tls druid-router-custom-check-tls druid-broker druid-middlemanager druid-zookeeper-kafka druid-metadata-storage druid-it-hadoop;
@@ -71,6 +77,9 @@
   $ For druid-hdfs-storage
   mkdir -p $SHARED_DIR/docker/extensions/druid-hdfs-storage
   mv $SHARED_DIR/docker/lib/druid-hdfs-storage-* $SHARED_DIR/docker/extensions/druid-hdfs-storage
+  # For druid-kinesis-indexing-service
+  mkdir -p $SHARED_DIR/docker/extensions/druid-kinesis-indexing-service
+  mv $SHARED_DIR/docker/lib/druid-kinesis-indexing-service-* $SHARED_DIR/docker/extensions/druid-kinesis-indexing-service
 
   # Pull Hadoop dependency if needed
   if [ -n "$DRUID_INTEGRATION_TEST_START_HADOOP_DOCKER" ] && [ "$DRUID_INTEGRATION_TEST_START_HADOOP_DOCKER" == true ]
@@ -200,32 +209,32 @@ fi
   docker run -d --privileged --net druid-it-net --ip 172.172.172.2 ${COMMON_ENV} --name druid-zookeeper-kafka -p 2181:2181 -p 9092:9092 -p 9093:9093 -v $SHARED_DIR:/shared -v $SERVICE_SUPERVISORDS_DIR/zookeeper.conf:$SUPERVISORDIR/zookeeper.conf -v $SERVICE_SUPERVISORDS_DIR/kafka.conf:$SUPERVISORDIR/kafka.conf druid/cluster
 
   # Start MYSQL
-  docker run -d --privileged --net druid-it-net --ip 172.172.172.3 ${COMMON_ENV} --name druid-metadata-storage -v $SHARED_DIR:/shared -v $SERVICE_SUPERVISORDS_DIR/metadata-storage.conf:$SUPERVISORDIR/metadata-storage.conf druid/cluster
+  docker run -d --privileged --net druid-it-net --ip 172.172.172.3 ${COMMON_ENV} --name druid-metadata-storage -p 3306:3306 -v $SHARED_DIR:/shared -v $SERVICE_SUPERVISORDS_DIR/metadata-storage.conf:$SUPERVISORDIR/metadata-storage.conf druid/cluster
 
   # Start Overlord
-  docker run -d --privileged --net druid-it-net --ip 172.172.172.4 ${COMMON_ENV} ${OVERLORD_ENV} ${OVERRIDE_ENV} --name druid-overlord -p 8090:8090 -p 8290:8290 -v $SHARED_DIR:/shared -v $SERVICE_SUPERVISORDS_DIR/druid.conf:$SUPERVISORDIR/druid.conf --link druid-metadata-storage:druid-metadata-storage --link druid-zookeeper-kafka:druid-zookeeper-kafka druid/cluster
+  docker run -d --privileged --net druid-it-net --ip 172.172.172.4 ${COMMON_ENV} ${OVERLORD_ENV} ${OVERRIDE_ENV} --name druid-overlord -p 5009:5009 -p 8090:8090 -p 8290:8290 -v $SHARED_DIR:/shared -v $SERVICE_SUPERVISORDS_DIR/druid.conf:$SUPERVISORDIR/druid.conf --link druid-metadata-storage:druid-metadata-storage --link druid-zookeeper-kafka:druid-zookeeper-kafka druid/cluster
 
   # Start Coordinator
-  docker run -d --privileged --net druid-it-net --ip 172.172.172.5 ${COMMON_ENV} ${COORDINATOR_ENV} ${OVERRIDE_ENV} --name druid-coordinator -p 8081:8081 -p 8281:8281 -v $SHARED_DIR:/shared -v $SERVICE_SUPERVISORDS_DIR/druid.conf:$SUPERVISORDIR/druid.conf --link druid-overlord:druid-overlord --link druid-metadata-storage:druid-metadata-storage --link druid-zookeeper-kafka:druid-zookeeper-kafka druid/cluster
+  docker run -d --privileged --net druid-it-net --ip 172.172.172.5 ${COMMON_ENV} ${COORDINATOR_ENV} ${OVERRIDE_ENV} --name druid-coordinator -p 5006:5006 -p 8081:8081 -p 8281:8281 -v $SHARED_DIR:/shared -v $SERVICE_SUPERVISORDS_DIR/druid.conf:$SUPERVISORDIR/druid.conf --link druid-overlord:druid-overlord --link druid-metadata-storage:druid-metadata-storage --link druid-zookeeper-kafka:druid-zookeeper-kafka druid/cluster
 
   # Start Historical
-  docker run -d --privileged --net druid-it-net --ip 172.172.172.6 ${COMMON_ENV} ${HISTORICAL_ENV} ${OVERRIDE_ENV} --name druid-historical -p 8083:8083 -p 8283:8283 -v $SHARED_DIR:/shared -v $SERVICE_SUPERVISORDS_DIR/druid.conf:$SUPERVISORDIR/druid.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka druid/cluster
+  docker run -d --privileged --net druid-it-net --ip 172.172.172.6 ${COMMON_ENV} ${HISTORICAL_ENV} ${OVERRIDE_ENV} --name druid-historical -p 5007:5007 -p 8083:8083 -p 8283:8283 -v $SHARED_DIR:/shared -v $SERVICE_SUPERVISORDS_DIR/druid.conf:$SUPERVISORDIR/druid.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka druid/cluster
 
   # Start Middlemanger
-  docker run -d --privileged --net druid-it-net --ip 172.172.172.7 ${COMMON_ENV} ${MIDDLEMANAGER_ENV} ${OVERRIDE_ENV} --name druid-middlemanager -p 8091:8091 -p 8291:8291 -p 8100:8100 -p 8101:8101 -p 8102:8102 -p 8103:8103 -p 8104:8104 -p 8105:8105 -p 8300:8300 -p 8301:8301 -p 8302:8302 -p 8303:8303 -p 8304:8304 -p 8305:8305 -v $RESOURCEDIR:/resources -v $SHARED_DIR:/shared -v $SERVICE_SUPERVISORDS_DIR/druid.conf:$SUPERVISORDIR/druid.conf --link druid-zookeeper-kafka:druid-zookeeper-kafk [...]
+  docker run -d --privileged --net druid-it-net --ip 172.172.172.7 ${COMMON_ENV} ${MIDDLEMANAGER_ENV} ${OVERRIDE_ENV} --name druid-middlemanager -p 5008:5008 -p 8091:8091 -p 8291:8291 -p 8100:8100 -p 8101:8101 -p 8102:8102 -p 8103:8103 -p 8104:8104 -p 8105:8105 -p 8300:8300 -p 8301:8301 -p 8302:8302 -p 8303:8303 -p 8304:8304 -p 8305:8305 -v $RESOURCEDIR:/resources -v $SHARED_DIR:/shared -v $SERVICE_SUPERVISORDS_DIR/druid.conf:$SUPERVISORDIR/druid.conf --link druid-zookeeper-kafka:druid-z [...]
 
   # Start Broker
-  docker run -d --privileged --net druid-it-net --ip 172.172.172.8 ${COMMON_ENV} ${BROKER_ENV} ${OVERRIDE_ENV} --name druid-broker -p 8082:8082 -p 8282:8282 -v $SHARED_DIR:/shared -v $SERVICE_SUPERVISORDS_DIR/druid.conf:$SUPERVISORDIR/druid.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka --link druid-middlemanager:druid-middlemanager --link druid-historical:druid-historical druid/cluster
+  docker run -d --privileged --net druid-it-net --ip 172.172.172.8 ${COMMON_ENV} ${BROKER_ENV} ${OVERRIDE_ENV} --name druid-broker -p 5005:5005 -p 8082:8082 -p 8282:8282 -v $SHARED_DIR:/shared -v $SERVICE_SUPERVISORDS_DIR/druid.conf:$SUPERVISORDIR/druid.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka --link druid-middlemanager:druid-middlemanager --link druid-historical:druid-historical druid/cluster
 
   # Start Router
-  docker run -d --privileged --net druid-it-net --ip 172.172.172.9 ${COMMON_ENV} ${ROUTER_ENV} ${OVERRIDE_ENV} --name druid-router -p 8888:8888 -p 9088:9088 -v $SHARED_DIR:/shared -v $SERVICE_SUPERVISORDS_DIR/druid.conf:$SUPERVISORDIR/druid.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka --link druid-coordinator:druid-coordinator --link druid-broker:druid-broker druid/cluster
+  docker run -d --privileged --net druid-it-net --ip 172.172.172.9 ${COMMON_ENV} ${ROUTER_ENV} ${OVERRIDE_ENV} --name druid-router -p 8888:8888 -p 5004:5004 -p 9088:9088 -v $SHARED_DIR:/shared -v $SERVICE_SUPERVISORDS_DIR/druid.conf:$SUPERVISORDIR/druid.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka --link druid-coordinator:druid-coordinator --link druid-broker:druid-broker druid/cluster
 
   # Start Router with permissive TLS settings (client auth enabled, no hostname verification, no revocation check)
-  docker run -d --privileged --net druid-it-net --ip 172.172.172.10 ${COMMON_ENV} ${ROUTER_PERMISSIVE_TLS_ENV} ${OVERRIDE_ENV} --name druid-router-permissive-tls -p 8889:8889 -p 9089:9089 -v $SHARED_DIR:/shared -v $SERVICE_SUPERVISORDS_DIR/druid.conf:$SUPERVISORDIR/druid.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka --link druid-coordinator:druid-coordinator --link druid-broker:druid-broker druid/cluster
+  docker run -d --privileged --net druid-it-net --ip 172.172.172.10 ${COMMON_ENV} ${ROUTER_PERMISSIVE_TLS_ENV} ${OVERRIDE_ENV} --name druid-router-permissive-tls -p 5001:5001 -p 8889:8889 -p 9089:9089 -v $SHARED_DIR:/shared -v $SERVICE_SUPERVISORDS_DIR/druid.conf:$SUPERVISORDIR/druid.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka --link druid-coordinator:druid-coordinator --link druid-broker:druid-broker druid/cluster
 
   # Start Router with TLS but no client auth
-  docker run -d --privileged --net druid-it-net --ip 172.172.172.11 ${COMMON_ENV} ${ROUTER_NO_CLIENT_AUTH_TLS_ENV} ${OVERRIDE_ENV} --name druid-router-no-client-auth-tls -p 8890:8890 -p 9090:9090 -v $SHARED_DIR:/shared -v $SERVICE_SUPERVISORDS_DIR/druid.conf:$SUPERVISORDIR/druid.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka --link druid-coordinator:druid-coordinator --link druid-broker:druid-broker druid/cluster
+  docker run -d --privileged --net druid-it-net --ip 172.172.172.11 ${COMMON_ENV} ${ROUTER_NO_CLIENT_AUTH_TLS_ENV} ${OVERRIDE_ENV} --name druid-router-no-client-auth-tls -p 5002:5002 -p 8890:8890 -p 9090:9090 -v $SHARED_DIR:/shared -v $SERVICE_SUPERVISORDS_DIR/druid.conf:$SUPERVISORDIR/druid.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka --link druid-coordinator:druid-coordinator --link druid-broker:druid-broker druid/cluster
 
   # Start Router with custom TLS cert checkers
-  docker run -d --privileged --net druid-it-net --ip 172.172.172.12 ${COMMON_ENV} ${ROUTER_CUSTOM_CHECK_TLS_ENV} ${OVERRIDE_ENV} --hostname druid-router-custom-check-tls --name druid-router-custom-check-tls -p 8891:8891 -p 9091:9091 -v $SHARED_DIR:/shared -v $SERVICE_SUPERVISORDS_DIR/druid.conf:$SUPERVISORDIR/druid.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka --link druid-coordinator:druid-coordinator --link druid-broker:druid-broker druid/cluster
+  docker run -d --privileged --net druid-it-net --ip 172.172.172.12 ${COMMON_ENV} ${ROUTER_CUSTOM_CHECK_TLS_ENV} ${OVERRIDE_ENV} --hostname druid-router-custom-check-tls --name druid-router-custom-check-tls -p 5003:5003 -p 8891:8891 -p 9091:9091 -v $SHARED_DIR:/shared -v $SERVICE_SUPERVISORDS_DIR/druid.conf:$SUPERVISORDIR/druid.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka --link druid-coordinator:druid-coordinator --link druid-broker:druid-broker druid/cluster
 }
\ No newline at end of file
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/ConfigFileConfigProvider.java b/integration-tests/src/main/java/org/apache/druid/testing/ConfigFileConfigProvider.java
index 769b712..1fec42c 100644
--- a/integration-tests/src/main/java/org/apache/druid/testing/ConfigFileConfigProvider.java
+++ b/integration-tests/src/main/java/org/apache/druid/testing/ConfigFileConfigProvider.java
@@ -57,6 +57,7 @@ public class ConfigFileConfigProvider implements IntegrationTestingConfigProvide
   private String password;
   private String cloudBucket;
   private String cloudPath;
+  private String streamEndpoint;
 
   @JsonCreator
   ConfigFileConfigProvider(@JsonProperty("configFile") String configFile)
@@ -192,6 +193,7 @@ public class ConfigFileConfigProvider implements IntegrationTestingConfigProvide
 
     cloudBucket = props.get("cloud_bucket");
     cloudPath = props.get("cloud_path");
+    streamEndpoint = props.get("stream_endpoint");
 
     LOG.info("router: [%s], [%s]", routerUrl, routerTLSUrl);
     LOG.info("broker: [%s], [%s]", brokerUrl, brokerTLSUrl);
@@ -355,6 +357,12 @@ public class ConfigFileConfigProvider implements IntegrationTestingConfigProvide
       }
 
       @Override
+      public String getStreamEndpoint()
+      {
+        return streamEndpoint;
+      }
+
+      @Override
       public Map<String, String> getProperties()
       {
         return props;
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/DockerConfigProvider.java b/integration-tests/src/main/java/org/apache/druid/testing/DockerConfigProvider.java
index 83d80e7..e33e121 100644
--- a/integration-tests/src/main/java/org/apache/druid/testing/DockerConfigProvider.java
+++ b/integration-tests/src/main/java/org/apache/druid/testing/DockerConfigProvider.java
@@ -46,6 +46,9 @@ public class DockerConfigProvider implements IntegrationTestingConfigProvider
   @JsonProperty
   private String cloudBucket;
 
+  @JsonProperty
+  private String streamEndpoint;
+
   @Override
   public IntegrationTestingConfig get()
   {
@@ -229,6 +232,12 @@ public class DockerConfigProvider implements IntegrationTestingConfigProvider
       {
         return cloudPath;
       }
+
+      @Override
+      public String getStreamEndpoint()
+      {
+        return streamEndpoint;
+      }
     };
   }
 }
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/IntegrationTestingConfig.java b/integration-tests/src/main/java/org/apache/druid/testing/IntegrationTestingConfig.java
index d178f90..17f2aab 100644
--- a/integration-tests/src/main/java/org/apache/druid/testing/IntegrationTestingConfig.java
+++ b/integration-tests/src/main/java/org/apache/druid/testing/IntegrationTestingConfig.java
@@ -88,4 +88,6 @@ public interface IntegrationTestingConfig
   String getCloudBucket();
 
   String getCloudPath();
+
+  String getStreamEndpoint();
 }
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
index 6ad6411..7995595 100644
--- a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
+++ b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
@@ -26,6 +26,7 @@ import com.google.inject.Inject;
 import org.apache.druid.client.indexing.TaskStatusResponse;
 import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexer.TaskStatusPlus;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.RetryUtils;
 import org.apache.druid.java.util.common.StringUtils;
@@ -42,6 +43,7 @@ import org.jboss.netty.handler.codec.http.HttpMethod;
 import org.jboss.netty.handler.codec.http.HttpResponseStatus;
 
 import java.net.URL;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
@@ -155,6 +157,15 @@ public class OverlordResourceTestClient
     return getTasks(StringUtils.format("tasks?state=complete&datasource=%s", StringUtils.urlEncode(dataSource)));
   }
 
+  public List<TaskResponseObject> getUncompletedTasksForDataSource(final String dataSource)
+  {
+    List<TaskResponseObject> uncompletedTasks = new ArrayList<>();
+    uncompletedTasks.addAll(getTasks(StringUtils.format("tasks?state=pending&datasource=%s", StringUtils.urlEncode(dataSource))));
+    uncompletedTasks.addAll(getTasks(StringUtils.format("tasks?state=running&datasource=%s", StringUtils.urlEncode(dataSource))));
+    uncompletedTasks.addAll(getTasks(StringUtils.format("tasks?state=waiting&datasource=%s", StringUtils.urlEncode(dataSource))));
+    return uncompletedTasks;
+  }
+
   private List<TaskResponseObject> getTasks(String identifier)
   {
     try {
@@ -287,6 +298,100 @@ public class OverlordResourceTestClient
     }
   }
 
+  public SupervisorStateManager.BasicState getSupervisorStatus(String id)
+  {
+    try {
+      StatusResponseHolder response = httpClient.go(
+          new Request(
+              HttpMethod.GET,
+              new URL(StringUtils.format(
+                  "%ssupervisor/%s/status",
+                  getIndexerURL(),
+                  StringUtils.urlEncode(id)
+              ))
+          ),
+          StatusResponseHandler.getInstance()
+      ).get();
+      if (!response.getStatus().equals(HttpResponseStatus.OK)) {
+        throw new ISE(
+            "Error while getting supervisor status, response [%s %s]",
+            response.getStatus(),
+            response.getContent()
+        );
+      }
+      Map<String, Object> responseData = jsonMapper.readValue(
+          response.getContent(), JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
+      );
+
+      Map<String, Object> payload = jsonMapper.convertValue(
+          responseData.get("payload"),
+          JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
+      );
+      String state = (String) payload.get("state");
+      LOG.info("Supervisor id[%s] has state [%s]", id, state);
+      return SupervisorStateManager.BasicState.valueOf(state);
+    }
+    catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public void suspendSupervisor(String id)
+  {
+    try {
+      StatusResponseHolder response = httpClient.go(
+          new Request(
+              HttpMethod.POST,
+              new URL(StringUtils.format(
+                  "%ssupervisor/%s/suspend",
+                  getIndexerURL(),
+                  StringUtils.urlEncode(id)
+              ))
+          ),
+          StatusResponseHandler.getInstance()
+      ).get();
+      if (!response.getStatus().equals(HttpResponseStatus.OK)) {
+        throw new ISE(
+            "Error while suspending supervisor, response [%s %s]",
+            response.getStatus(),
+            response.getContent()
+        );
+      }
+      LOG.info("Suspended supervisor with id[%s]", id);
+    }
+    catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public void resumeSupervisor(String id)
+  {
+    try {
+      StatusResponseHolder response = httpClient.go(
+          new Request(
+              HttpMethod.POST,
+              new URL(StringUtils.format(
+                  "%ssupervisor/%s/resume",
+                  getIndexerURL(),
+                  StringUtils.urlEncode(id)
+              ))
+          ),
+          StatusResponseHandler.getInstance()
+      ).get();
+      if (!response.getStatus().equals(HttpResponseStatus.OK)) {
+        throw new ISE(
+            "Error while resuming supervisor, response [%s %s]",
+            response.getStatus(),
+            response.getContent()
+        );
+      }
+      LOG.info("Resumed supervisor with id[%s]", id);
+    }
+    catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
   private StatusResponseHolder makeRequest(HttpMethod method, String url)
   {
     try {
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/guice/DruidTestModuleFactory.java b/integration-tests/src/main/java/org/apache/druid/testing/guice/DruidTestModuleFactory.java
index e082ffc..2c3b59d 100644
--- a/integration-tests/src/main/java/org/apache/druid/testing/guice/DruidTestModuleFactory.java
+++ b/integration-tests/src/main/java/org/apache/druid/testing/guice/DruidTestModuleFactory.java
@@ -57,7 +57,6 @@ public class DruidTestModuleFactory implements IModuleFactory
   @Override
   public Module createModule(ITestContext context, Class<?> testClass)
   {
-    context.addGuiceModule(DruidTestModule.class, MODULE);
     context.addInjector(Collections.singletonList(MODULE), INJECTOR);
     return MODULE;
   }
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/DruidClusterAdminClient.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/DruidClusterAdminClient.java
new file mode 100644
index 0000000..4c6518d
--- /dev/null
+++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/DruidClusterAdminClient.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.utils;
+
+import com.github.dockerjava.api.DockerClient;
+import com.github.dockerjava.api.model.Container;
+import com.github.dockerjava.core.DockerClientBuilder;
+import com.github.dockerjava.netty.NettyDockerCmdExecFactory;
+import com.google.inject.Inject;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.http.client.HttpClient;
+import org.apache.druid.java.util.http.client.Request;
+import org.apache.druid.java.util.http.client.response.StatusResponseHandler;
+import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
+import org.apache.druid.testing.IntegrationTestingConfig;
+import org.apache.druid.testing.guice.TestClient;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+
+import java.net.URL;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+
+public class DruidClusterAdminClient
+{
+  private static final Logger LOG = new Logger(DruidClusterAdminClient.class);
+  private static final String COORDINATOR_DOCKER_CONTAINER_NAME = "/druid-coordinator";
+  private static final String HISTORICAL_DOCKER_CONTAINER_NAME = "/druid-historical";
+  private static final String INDEXER_DOCKER_CONTAINER_NAME = "/druid-overlord";
+  private static final String BROKERR_DOCKER_CONTAINER_NAME = "/druid-broker";
+  private static final String ROUTER_DOCKER_CONTAINER_NAME = "/druid-router";
+  private static final String MIDDLEMANAGER_DOCKER_CONTAINER_NAME = "/druid-middlemanager";
+
+  private final HttpClient httpClient;
+  private IntegrationTestingConfig config;
+
+  @Inject
+  DruidClusterAdminClient(
+      @TestClient HttpClient httpClient,
+      IntegrationTestingConfig config
+  )
+  {
+    this.httpClient = httpClient;
+    this.config = config;
+  }
+
+  public void restartCoordinatorContainer()
+  {
+    restartDockerContainer(COORDINATOR_DOCKER_CONTAINER_NAME);
+  }
+
+  public void restartHistoricalContainer()
+  {
+    restartDockerContainer(HISTORICAL_DOCKER_CONTAINER_NAME);
+  }
+
+  public void restartIndexerContainer()
+  {
+    restartDockerContainer(INDEXER_DOCKER_CONTAINER_NAME);
+  }
+
+  public void restartBrokerContainer()
+  {
+    restartDockerContainer(BROKERR_DOCKER_CONTAINER_NAME);
+  }
+
+  public void restartRouterContainer()
+  {
+    restartDockerContainer(ROUTER_DOCKER_CONTAINER_NAME);
+  }
+
+  public void restartMiddleManagerContainer()
+  {
+    restartDockerContainer(MIDDLEMANAGER_DOCKER_CONTAINER_NAME);
+  }
+
+  public void waitUntilCoordinatorReady()
+  {
+    waitUntilInstanceReady(config.getCoordinatorUrl());
+  }
+
+  public void waitUntilHistoricalReady()
+  {
+    waitUntilInstanceReady(config.getHistoricalUrl());
+  }
+
+  public void waitUntilIndexerReady()
+  {
+    waitUntilInstanceReady(config.getIndexerUrl());
+  }
+
+  public void waitUntilBrokerReady()
+  {
+    waitUntilInstanceReady(config.getBrokerUrl());
+  }
+
+  public void waitUntilRouterReady()
+  {
+    waitUntilInstanceReady(config.getRouterUrl());
+  }
+
+  private void restartDockerContainer(String serviceName)
+  {
+    DockerClient dockerClient = DockerClientBuilder.getInstance()
+                                                   .withDockerCmdExecFactory((new NettyDockerCmdExecFactory())
+                                                                                 .withConnectTimeout(10 * 1000))
+                                                   .build();
+    List<Container> containers = dockerClient.listContainersCmd().exec();
+    Optional<String> containerName = containers.stream()
+                                               .filter(container -> Arrays.asList(container.getNames()).contains(serviceName))
+                                               .findFirst()
+                                               .map(container -> container.getId());
+
+    if (!containerName.isPresent()) {
+      LOG.error("Cannot find docker container for " + serviceName);
+      throw new ISE("Cannot find docker container for " + serviceName);
+    }
+    dockerClient.restartContainerCmd(containerName.get()).exec();
+  }
+
+  private void waitUntilInstanceReady(final String host)
+  {
+    ITRetryUtil.retryUntilTrue(
+        () -> {
+          try {
+            StatusResponseHolder response = httpClient.go(
+                new Request(HttpMethod.GET, new URL(StringUtils.format("%s/status/health", host))),
+                StatusResponseHandler.getInstance()
+            ).get();
+
+            LOG.info("%s %s", response.getStatus(), response.getContent());
+            return response.getStatus().equals(HttpResponseStatus.OK);
+          }
+          catch (Throwable e) {
+            LOG.error(e, "");
+            return false;
+          }
+        },
+        "Waiting for instance to be ready: [" + host + "]"
+    );
+  }
+}
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaAdminClient.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaAdminClient.java
new file mode 100644
index 0000000..d63d088
--- /dev/null
+++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaAdminClient.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.utils;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.CreatePartitionsResult;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.DeleteTopicsResult;
+import org.apache.kafka.clients.admin.DescribeTopicsResult;
+import org.apache.kafka.clients.admin.NewPartitions;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.TopicDescription;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+public class KafkaAdminClient implements StreamAdminClient
+{
+  private AdminClient adminClient;
+
+  public KafkaAdminClient(String kafkaInternalHost)
+  {
+    Properties config = new Properties();
+    config.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaInternalHost);
+    adminClient = AdminClient.create(config);
+  }
+
+  @Override
+  public void createStream(String streamName, int partitionCount, Map<String, String> tags) throws Exception
+  {
+    final short replicationFactor = 1;
+    final NewTopic newTopic = new NewTopic(streamName, partitionCount, replicationFactor);
+    final CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singleton(newTopic));
+    // Wait for create topic to compelte
+    createTopicsResult.values().get(streamName).get();
+  }
+
+  @Override
+  public void deleteStream(String streamName) throws Exception
+  {
+    DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(ImmutableList.of(streamName));
+    deleteTopicsResult.values().get(streamName).get();
+  }
+
+  /**
+   * This method can only increase the partition count of {@param streamName} to have a final partition
+   * count of {@param newPartitionCount}
+   * If {@param blocksUntilStarted} is set to true, then this method will blocks until the partitioning
+   * started (but not nessesary finished), otherwise, the method will returns right after issue the
+   * repartitioning command
+   */
+  @Override
+  public void updatePartitionCount(String streamName, int newPartitionCount, boolean blocksUntilStarted) throws Exception
+  {
+    Map<String, NewPartitions> counts = new HashMap<>();
+    counts.put(streamName, NewPartitions.increaseTo(newPartitionCount));
+    CreatePartitionsResult createPartitionsResult = adminClient.createPartitions(counts);
+    if (blocksUntilStarted) {
+      createPartitionsResult.values().get(streamName).get();
+
+    }
+  }
+
+  /**
+   * Stream state such as active/non-active does not applies to Kafka.
+   * Returning true since Kafka stream is always active and can always be writen and read to.
+   */
+  @Override
+  public boolean isStreamActive(String streamName)
+  {
+    return true;
+  }
+
+  @Override
+  public int getStreamPartitionCount(String streamName) throws Exception
+  {
+    DescribeTopicsResult result = adminClient.describeTopics(ImmutableList.of(streamName));
+    TopicDescription topicDescription = result.values().get(streamName).get();
+    return topicDescription.partitions().size();
+  }
+
+  @Override
+  public boolean verfiyPartitionCountUpdated(String streamName, int oldPartitionCount, int newPartitionCount) throws Exception
+  {
+    return getStreamPartitionCount(streamName) == newPartitionCount;
+  }
+}
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaEventWriter.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaEventWriter.java
new file mode 100644
index 0000000..f7ec755
--- /dev/null
+++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaEventWriter.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.utils;
+
+import org.apache.druid.indexer.TaskIdUtils;
+import org.apache.druid.testing.IntegrationTestingConfig;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Future;
+
+public class KafkaEventWriter implements StreamEventWriter
+{
+  private static final String TEST_PROPERTY_PREFIX = "kafka.test.property.";
+  private final KafkaProducer<String, String> producer;
+  private final boolean txnEnabled;
+  private final List<Future<RecordMetadata>> pendingWriteRecords = new ArrayList<>();
+
+  public KafkaEventWriter(IntegrationTestingConfig config, boolean txnEnabled)
+  {
+    Properties properties = new Properties();
+    addFilteredProperties(config, properties);
+    properties.setProperty("bootstrap.servers", config.getKafkaHost());
+    properties.setProperty("acks", "all");
+    properties.setProperty("retries", "3");
+    properties.setProperty("key.serializer", ByteArraySerializer.class.getName());
+    properties.setProperty("value.serializer", ByteArraySerializer.class.getName());
+    this.txnEnabled = txnEnabled;
+    if (txnEnabled) {
+      properties.setProperty("enable.idempotence", "true");
+      properties.setProperty("transactional.id", TaskIdUtils.getRandomId());
+    }
+    this.producer = new KafkaProducer<>(
+        properties,
+        new StringSerializer(),
+        new StringSerializer()
+    );
+    if (txnEnabled) {
+      producer.initTransactions();
+    }
+  }
+
+  @Override
+  public boolean isTransactionEnabled()
+  {
+    return txnEnabled;
+  }
+
+  @Override
+  public void initTransaction()
+  {
+    if (txnEnabled) {
+      producer.beginTransaction();
+    } else {
+      throw new IllegalStateException("Kafka writer was initialized with transaction disabled");
+    }
+  }
+
+  @Override
+  public void commitTransaction()
+  {
+    if (txnEnabled) {
+      producer.commitTransaction();
+    } else {
+      throw new IllegalStateException("Kafka writer was initialized with transaction disabled");
+    }
+  }
+
+  @Override
+  public void write(String topic, String event)
+  {
+    Future<RecordMetadata> future = producer.send(new ProducerRecord<>(topic, event));
+    pendingWriteRecords.add(future);
+  }
+
+  @Override
+  public void shutdown()
+  {
+    producer.close();
+  }
+
+  @Override
+  public void flush() throws Exception
+  {
+    for (Future<RecordMetadata> future : pendingWriteRecords) {
+      future.get();
+    }
+    pendingWriteRecords.clear();
+  }
+
+  private void addFilteredProperties(IntegrationTestingConfig config, Properties properties)
+  {
+    for (Map.Entry<String, String> entry : config.getProperties().entrySet()) {
+      if (entry.getKey().startsWith(TEST_PROPERTY_PREFIX)) {
+        properties.setProperty(entry.getKey().substring(TEST_PROPERTY_PREFIX.length()), entry.getValue());
+      }
+    }
+  }
+}
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/KinesisAdminClient.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/KinesisAdminClient.java
new file mode 100644
index 0000000..7c8759a
--- /dev/null
+++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/KinesisAdminClient.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.utils;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.client.builder.AwsClientBuilder;
+import com.amazonaws.services.kinesis.AmazonKinesis;
+import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
+import com.amazonaws.services.kinesis.model.AddTagsToStreamRequest;
+import com.amazonaws.services.kinesis.model.AddTagsToStreamResult;
+import com.amazonaws.services.kinesis.model.CreateStreamResult;
+import com.amazonaws.services.kinesis.model.DeleteStreamResult;
+import com.amazonaws.services.kinesis.model.DescribeStreamResult;
+import com.amazonaws.services.kinesis.model.ScalingType;
+import com.amazonaws.services.kinesis.model.StreamDescription;
+import com.amazonaws.services.kinesis.model.StreamStatus;
+import com.amazonaws.services.kinesis.model.UpdateShardCountRequest;
+import com.amazonaws.services.kinesis.model.UpdateShardCountResult;
+import com.amazonaws.util.AwsHostNameUtils;
+import org.apache.druid.java.util.common.ISE;
+
+import java.io.FileInputStream;
+import java.util.Map;
+import java.util.Properties;
+
+public class KinesisAdminClient implements StreamAdminClient
+{
+  private AmazonKinesis amazonKinesis;
+
+  public KinesisAdminClient(String endpoint) throws Exception
+  {
+    String pathToConfigFile = System.getProperty("override.config.path");
+    Properties prop = new Properties();
+    prop.load(new FileInputStream(pathToConfigFile));
+
+    AWSStaticCredentialsProvider credentials = new AWSStaticCredentialsProvider(
+        new BasicAWSCredentials(
+            prop.getProperty("druid_kinesis_accessKey"),
+            prop.getProperty("druid_kinesis_secretKey")
+        )
+    );
+    amazonKinesis = AmazonKinesisClientBuilder.standard()
+                              .withCredentials(credentials)
+                              .withClientConfiguration(new ClientConfiguration())
+                              .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(
+                                  endpoint,
+                                  AwsHostNameUtils.parseRegion(
+                                      endpoint,
+                                      null
+                                  )
+                              )).build();
+  }
+
+  @Override
+  public void createStream(String streamName, int shardCount, Map<String, String> tags)
+  {
+    CreateStreamResult createStreamResult = amazonKinesis.createStream(streamName, shardCount);
+    if (createStreamResult.getSdkHttpMetadata().getHttpStatusCode() != 200) {
+      throw new ISE("Cannot create stream for integration test");
+    }
+    if (tags != null && !tags.isEmpty()) {
+      AddTagsToStreamRequest addTagsToStreamRequest = new AddTagsToStreamRequest();
+      addTagsToStreamRequest.setStreamName(streamName);
+      addTagsToStreamRequest.setTags(tags);
+      AddTagsToStreamResult addTagsToStreamResult = amazonKinesis.addTagsToStream(addTagsToStreamRequest);
+      if (addTagsToStreamResult.getSdkHttpMetadata().getHttpStatusCode() != 200) {
+        throw new ISE("Cannot tag stream for integration test");
+      }
+    }
+
+  }
+
+  @Override
+  public void deleteStream(String streamName)
+  {
+    DeleteStreamResult deleteStreamResult = amazonKinesis.deleteStream(streamName);
+    if (deleteStreamResult.getSdkHttpMetadata().getHttpStatusCode() != 200) {
+      throw new ISE("Cannot delete stream for integration test");
+    }
+  }
+
+  /**
+   * This method updates the shard count of {@param streamName} to have a final shard count of {@param newShardCount}
+   * If {@param blocksUntilStarted} is set to true, then this method will blocks until the resharding
+   * started (but not nessesary finished), otherwise, the method will returns right after issue the reshard command
+   */
+  @Override
+  public void updatePartitionCount(String streamName, int newShardCount, boolean blocksUntilStarted)
+  {
+    int originalShardCount = getStreamPartitionCount(streamName);
+    UpdateShardCountRequest updateShardCountRequest = new UpdateShardCountRequest();
+    updateShardCountRequest.setStreamName(streamName);
+    updateShardCountRequest.setTargetShardCount(newShardCount);
+    updateShardCountRequest.setScalingType(ScalingType.UNIFORM_SCALING);
+    UpdateShardCountResult updateShardCountResult = amazonKinesis.updateShardCount(updateShardCountRequest);
+    if (updateShardCountResult.getSdkHttpMetadata().getHttpStatusCode() != 200) {
+      throw new ISE("Cannot update stream's shard count for integration test");
+    }
+    if (blocksUntilStarted) {
+      // Wait until the resharding started (or finished)
+      ITRetryUtil.retryUntil(
+          () -> {
+            StreamDescription streamDescription = getStreamDescription(streamName);
+            int updatedShardCount = getStreamShardCount(streamDescription);
+            return verifyStreamStatus(streamDescription, StreamStatus.UPDATING) ||
+                (verifyStreamStatus(streamDescription, StreamStatus.ACTIVE) && updatedShardCount > originalShardCount);
+          },
+          true,
+          30,
+          30,
+          "Kinesis stream resharding to start (or finished)"
+      );
+    }
+  }
+
+  @Override
+  public boolean isStreamActive(String streamName)
+  {
+    StreamDescription streamDescription = getStreamDescription(streamName);
+    return verifyStreamStatus(streamDescription, StreamStatus.ACTIVE);
+  }
+
+  @Override
+  public int getStreamPartitionCount(String streamName)
+  {
+    StreamDescription streamDescription = getStreamDescription(streamName);
+    return getStreamShardCount(streamDescription);
+  }
+
+  @Override
+  public boolean verfiyPartitionCountUpdated(String streamName, int oldShardCount, int newShardCount)
+  {
+    int actualShardCount = getStreamPartitionCount(streamName);
+    // Kinesis does not immediately drop the old shards after the resharding and hence,
+    // would still returns both open shards and closed shards from the API call.
+    // To verify, we sum the old count (closed shareds) and the expected new count (open shards)
+    return actualShardCount == oldShardCount + newShardCount;
+  }
+
+
+  private boolean verifyStreamStatus(StreamDescription streamDescription, StreamStatus streamStatusToCheck)
+  {
+    return streamStatusToCheck.toString().equals(streamDescription.getStreamStatus());
+  }
+
+  private int getStreamShardCount(StreamDescription streamDescription)
+  {
+    return streamDescription.getShards().size();
+  }
+
+  private StreamDescription getStreamDescription(String streamName)
+  {
+    DescribeStreamResult describeStreamResult = amazonKinesis.describeStream(streamName);
+    if (describeStreamResult.getSdkHttpMetadata().getHttpStatusCode() != 200) {
+      throw new ISE("Cannot get stream description for integration test");
+    }
+    return describeStreamResult.getStreamDescription();
+  }
+}
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/KinesisEventWriter.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/KinesisEventWriter.java
new file mode 100644
index 0000000..0377e9e
--- /dev/null
+++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/KinesisEventWriter.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.utils;
+
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.kinesis.producer.KinesisProducer;
+import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
+import com.amazonaws.util.AwsHostNameUtils;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+
+import java.io.FileInputStream;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Properties;
+
+public class KinesisEventWriter implements StreamEventWriter
+{
+  private static final Logger LOG = new Logger(KinesisEventWriter.class);
+
+  private final KinesisProducer kinesisProducer;
+
+  public KinesisEventWriter(String endpoint, boolean aggregate) throws Exception
+  {
+    String pathToConfigFile = System.getProperty("override.config.path");
+    Properties prop = new Properties();
+    prop.load(new FileInputStream(pathToConfigFile));
+
+    AWSStaticCredentialsProvider credentials = new AWSStaticCredentialsProvider(
+        new BasicAWSCredentials(
+            prop.getProperty("druid_kinesis_accessKey"),
+            prop.getProperty("druid_kinesis_secretKey")
+        )
+    );
+
+    KinesisProducerConfiguration kinesisProducerConfiguration = new KinesisProducerConfiguration()
+        .setCredentialsProvider(credentials)
+        .setRegion(AwsHostNameUtils.parseRegion(endpoint, null))
+        .setRequestTimeout(600000L)
+        .setConnectTimeout(300000L)
+        .setRecordTtl(9223372036854775807L)
+        .setMetricsLevel("none")
+        .setAggregationEnabled(aggregate);
+
+    this.kinesisProducer = new KinesisProducer(kinesisProducerConfiguration);
+  }
+
+  @Override
+  public boolean isTransactionEnabled()
+  {
+    return false;
+  }
+
+  @Override
+  public void initTransaction()
+  {
+    // No-Op as Kinesis does not support transaction
+  }
+
+  @Override
+  public void commitTransaction()
+  {
+    // No-Op as Kinesis does not support transaction
+  }
+
+  @Override
+  public void write(String streamName, String event)
+  {
+    kinesisProducer.addUserRecord(
+        streamName,
+        DigestUtils.sha1Hex(event),
+        ByteBuffer.wrap(event.getBytes(StandardCharsets.UTF_8))
+    );
+  }
+
+  @Override
+  public void shutdown()
+  {
+    LOG.info("Shutting down Kinesis client");
+    kinesisProducer.flushSync();
+  }
+
+  @Override
+  public void flush()
+  {
+    kinesisProducer.flushSync();
+    ITRetryUtil.retryUntil(
+        () -> kinesisProducer.getOutstandingRecordsCount() == 0,
+        true,
+        10000,
+        30,
+        "Waiting for all Kinesis writes to be flushed"
+    );
+  }
+}
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/StreamAdminClient.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/StreamAdminClient.java
new file mode 100644
index 0000000..ea36d1c
--- /dev/null
+++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/StreamAdminClient.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.utils;
+
+import java.util.Map;
+
+/**
+ * This interface provides the administrative client contract for any stream storage (such as Kafka and Kinesis)
+ * which supports managing and inspecting streams (aka topics) and stream's partitions (aka shards).
+ * This is used for setting up, tearing down and any other administrative changes required in integration tests.
+ * Each method resulting in a change of state for the stream is intended to be synchronous to help
+ * make integration tests deterministic and easy to write.
+ */
+public interface StreamAdminClient
+{
+  void createStream(String streamName, int partitionCount, Map<String, String> tags) throws Exception;
+
+  void deleteStream(String streamName) throws Exception;
+
+  void updatePartitionCount(String streamName, int newPartitionCount, boolean blocksUntilStarted) throws Exception;
+
+  boolean isStreamActive(String streamName);
+
+  int getStreamPartitionCount(String streamName) throws Exception;
+
+  boolean verfiyPartitionCountUpdated(String streamName, int oldPartitionCount, int newPartitionCount) throws Exception;
+}
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/StreamEventWriter.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/StreamEventWriter.java
new file mode 100644
index 0000000..5d25916
--- /dev/null
+++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/StreamEventWriter.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.utils;
+
+
+/**
+ * This interface is use to write test event data to the underlying stream (such as Kafka, Kinesis)
+ * This can also be use with {@link StreamGenerator} to write particular set of test data
+ */
+public interface StreamEventWriter
+{
+  void write(String topic, String event);
+
+  void shutdown();
+
+  void flush() throws Exception;
+
+  boolean isTransactionEnabled();
+
+  void initTransaction();
+
+  void commitTransaction();
+}
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/StreamGenerator.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/StreamGenerator.java
new file mode 100644
index 0000000..f2d1f48
--- /dev/null
+++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/StreamGenerator.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.utils;
+
+import org.joda.time.DateTime;
+
+public interface StreamGenerator
+{
+  void run(String streamTopic, StreamEventWriter streamEventWriter, int totalNumberOfSeconds);
+
+  void run(String streamTopic, StreamEventWriter streamEventWriter, int totalNumberOfSeconds, DateTime overrrideFirstEventTime);
+
+  void shutdown();
+}
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/StreamVerifierEventGenerator.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/StreamVerifierEventGenerator.java
new file mode 100644
index 0000000..bb56c79
--- /dev/null
+++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/StreamVerifierEventGenerator.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.utils;
+
+import org.apache.druid.java.util.common.DateTimes;
+import org.joda.time.DateTime;
+
+import java.util.UUID;
+
+public class StreamVerifierEventGenerator extends SyntheticStreamGenerator
+{
+  public StreamVerifierEventGenerator(int eventsPerSeconds, long cyclePaddingMs)
+  {
+    super(eventsPerSeconds, cyclePaddingMs);
+  }
+
+  @Override
+  Object getEvent(int i, DateTime timestamp)
+  {
+    return StreamVerifierSyntheticEvent.of(
+        UUID.randomUUID().toString(),
+        timestamp.getMillis(),
+        DateTimes.nowUtc().getMillis(),
+        i,
+        i == getEventsPerSecond() ? getSumOfEventSequence(getEventsPerSecond()) : null,
+        i == 1
+    );
+  }
+
+
+  /**
+   * Assumes the first number in the sequence is 1, incrementing by 1, until numEvents.
+   */
+  private long getSumOfEventSequence(int numEvents)
+  {
+    return (numEvents * (1 + numEvents)) / 2;
+  }
+}
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/StreamVerifierSyntheticEvent.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/StreamVerifierSyntheticEvent.java
new file mode 100644
index 0000000..e8c314a
--- /dev/null
+++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/StreamVerifierSyntheticEvent.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.utils;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class StreamVerifierSyntheticEvent
+{
+  private String id;
+  private long groupingTimestamp;
+  private long insertionTimestamp;
+  private long sequenceNumber;
+  private Long expectedSequenceNumberSum;
+  private boolean firstEvent;
+
+  public StreamVerifierSyntheticEvent(
+      String id,
+      long groupingTimestamp,
+      long insertionTimestamp,
+      long sequenceNumber,
+      Long expectedSequenceNumberSum,
+      boolean firstEvent
+  )
+  {
+    this.id = id;
+    this.groupingTimestamp = groupingTimestamp;
+    this.insertionTimestamp = insertionTimestamp;
+    this.sequenceNumber = sequenceNumber;
+    this.expectedSequenceNumberSum = expectedSequenceNumberSum;
+    this.firstEvent = firstEvent;
+  }
+
+  @JsonProperty
+  public String getId()
+  {
+    return id;
+  }
+
+  @JsonProperty
+  public long getGroupingTimestamp()
+  {
+    return groupingTimestamp;
+  }
+
+  @JsonProperty
+  public long getInsertionTimestamp()
+  {
+    return insertionTimestamp;
+  }
+
+  @JsonProperty
+  public long getSequenceNumber()
+  {
+    return sequenceNumber;
+  }
+
+  @JsonProperty
+  public Long getExpectedSequenceNumberSum()
+  {
+    return expectedSequenceNumberSum;
+  }
+
+  @JsonProperty
+  public Integer getFirstEventFlag()
+  {
+    return firstEvent ? 1 : null;
+  }
+
+  public static StreamVerifierSyntheticEvent of(
+      String id,
+      long groupingTimestamp,
+      long insertionTimestamp,
+      long sequenceNumber,
+      Long expectedSequenceNumberSum,
+      boolean firstEvent
+  )
+  {
+    return new StreamVerifierSyntheticEvent(
+        id,
+        groupingTimestamp,
+        insertionTimestamp,
+        sequenceNumber,
+        expectedSequenceNumberSum,
+        firstEvent
+    );
+  }
+}
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/SuiteListener.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/SuiteListener.java
new file mode 100644
index 0000000..6259156
--- /dev/null
+++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/SuiteListener.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.utils;
+
+import com.google.inject.Injector;
+import org.apache.druid.java.util.common.lifecycle.Lifecycle;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.testing.IntegrationTestingConfig;
+import org.apache.druid.testing.guice.DruidTestModuleFactory;
+import org.testng.ISuite;
+import org.testng.ISuiteListener;
+
+public class SuiteListener implements ISuiteListener
+{
+  private static final Logger LOG = new Logger(SuiteListener.class);
+
+  @Override
+  public void onStart(ISuite suite)
+  {
+    Injector injector = DruidTestModuleFactory.getInjector();
+    IntegrationTestingConfig config = injector.getInstance(IntegrationTestingConfig.class);
+    DruidClusterAdminClient druidClusterAdminClient = injector.getInstance(DruidClusterAdminClient.class);
+
+    druidClusterAdminClient.waitUntilCoordinatorReady();
+    druidClusterAdminClient.waitUntilIndexerReady();
+    druidClusterAdminClient.waitUntilBrokerReady();
+    String routerHost = config.getRouterUrl();
+    if (null != routerHost) {
+      druidClusterAdminClient.waitUntilRouterReady();
+    }
+    Lifecycle lifecycle = injector.getInstance(Lifecycle.class);
+    try {
+      lifecycle.start();
+    }
+    catch (Exception e) {
+      LOG.error(e, "");
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void onFinish(ISuite suite)
+  {
+    Injector injector = DruidTestModuleFactory.getInjector();
+    Lifecycle lifecycle = injector.getInstance(Lifecycle.class);
+    lifecycle.stop();
+  }
+}
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/SyntheticStreamGenerator.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/SyntheticStreamGenerator.java
new file mode 100644
index 0000000..f2bfde8
--- /dev/null
+++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/SyntheticStreamGenerator.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.utils;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.databind.InjectableValues;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.joda.time.DateTime;
+
+public abstract class SyntheticStreamGenerator implements StreamGenerator
+{
+  private static final Logger log = new Logger(SyntheticStreamGenerator.class);
+  static final ObjectMapper MAPPER = new DefaultObjectMapper();
+
+  static {
+    MAPPER.setInjectableValues(
+        new InjectableValues.Std()
+            .addValue(ObjectMapper.class.getName(), MAPPER)
+    );
+    MAPPER.setSerializationInclusion(JsonInclude.Include.NON_NULL);
+  }
+
+  public int getEventsPerSecond()
+  {
+    return eventsPerSecond;
+  }
+
+  private final int eventsPerSecond;
+
+  // When calculating rates, leave this buffer to minimize overruns where we're still writing messages from the previous
+  // second. If the generator finishes sending [eventsPerSecond] events and the second is not up, it will wait for the next
+  // second to begin.
+  private final long cyclePaddingMs;
+
+  public SyntheticStreamGenerator(int eventsPerSecond, long cyclePaddingMs)
+  {
+    this.eventsPerSecond = eventsPerSecond;
+    this.cyclePaddingMs = cyclePaddingMs;
+  }
+
+  abstract Object getEvent(int row, DateTime timestamp);
+
+  @Override
+  public void run(String streamTopic, StreamEventWriter streamEventWriter, int totalNumberOfSeconds)
+  {
+    run(streamTopic, streamEventWriter, totalNumberOfSeconds, null);
+  }
+
+  @Override
+  public void run(String streamTopic, StreamEventWriter streamEventWriter, int totalNumberOfSeconds, DateTime overrrideFirstEventTime)
+  {
+    // The idea here is that we will send [eventsPerSecond] events that will either use [nowFlooredToSecond]
+    // or the [overrrideFirstEventTime] as the primary timestamp.
+    // Having a fixed number of events that use the same timestamp will help in allowing us to determine if any events
+    // were dropped or duplicated. We will try to space the event generation over the remainder of the second so that it
+    // roughly completes at the top of the second, but if it doesn't complete, it will still send the remainder of the
+    // events with the original timestamp, even after wall time has moved onto the next second.
+    DateTime nowCeilingToSecond = DateTimes.nowUtc().secondOfDay().roundCeilingCopy();
+    DateTime eventTimestamp = overrrideFirstEventTime == null ? nowCeilingToSecond : overrrideFirstEventTime;
+    int seconds = 0;
+
+    while (true) {
+      try {
+        long sleepMillis = nowCeilingToSecond.getMillis() - DateTimes.nowUtc().getMillis();
+        if (sleepMillis > 0) {
+          log.info("Waiting %s ms for next run cycle (at %s)", sleepMillis, nowCeilingToSecond);
+          Thread.sleep(sleepMillis);
+          continue;
+        }
+
+        log.info(
+            "Beginning run cycle with %s events, target completion time: %s",
+            eventsPerSecond,
+            nowCeilingToSecond.plusSeconds(1).minus(cyclePaddingMs)
+        );
+
+        if (streamEventWriter.isTransactionEnabled()) {
+          streamEventWriter.initTransaction();
+        }
+
+        for (int i = 1; i <= eventsPerSecond; i++) {
+          streamEventWriter.write(streamTopic, MAPPER.writeValueAsString(getEvent(i, eventTimestamp)));
+
+          long sleepTime = calculateSleepTimeMs(eventsPerSecond - i, nowCeilingToSecond);
+          if ((i <= 100 && i % 10 == 0) || i % 100 == 0) {
+            log.info("Event: %s/%s, sleep time: %s ms", i, eventsPerSecond, sleepTime);
+          }
+
+          if (sleepTime > 0) {
+            Thread.sleep(sleepTime);
+          }
+        }
+
+        if (streamEventWriter.isTransactionEnabled()) {
+          streamEventWriter.commitTransaction();
+        }
+
+        nowCeilingToSecond = nowCeilingToSecond.plusSeconds(1);
+        eventTimestamp = eventTimestamp.plusSeconds(1);
+        seconds++;
+
+        log.info(
+            "Finished writing %s events, current time: %s - updating next timestamp to: %s",
+            eventsPerSecond,
+            DateTimes.nowUtc(),
+            nowCeilingToSecond
+        );
+
+        if (seconds >= totalNumberOfSeconds) {
+          streamEventWriter.flush();
+          log.info(
+              "Finished writing %s seconds",
+              seconds
+          );
+          break;
+        }
+      }
+      catch (Exception e) {
+        throw new RuntimeException("Exception in event generation loop", e);
+      }
+    }
+  }
+
+  @Override
+  public void shutdown()
+  {
+  }
+
+  /**
+   * Dynamically adjust delay between messages to spread them out over the remaining time left in the second.
+   */
+  private long calculateSleepTimeMs(long eventsRemaining, DateTime secondBeingProcessed)
+  {
+    if (eventsRemaining == 0) {
+      return 0;
+    }
+
+    DateTime now = DateTimes.nowUtc();
+    DateTime nextSecondToProcessMinusBuffer = secondBeingProcessed.plusSeconds(1).minus(cyclePaddingMs);
+
+    if (nextSecondToProcessMinusBuffer.isBefore(now)) {
+      return 0; // We're late!! Write messages as fast as you can
+    }
+
+    return (nextSecondToProcessMinusBuffer.getMillis() - now.getMillis()) / eventsRemaining;
+  }
+}
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/WikipediaStreamEventStreamGenerator.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/WikipediaStreamEventStreamGenerator.java
new file mode 100644
index 0000000..4fea67d
--- /dev/null
+++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/WikipediaStreamEventStreamGenerator.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.utils;
+
+import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class WikipediaStreamEventStreamGenerator extends SyntheticStreamGenerator
+{
+  private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss'Z'");
+
+  public WikipediaStreamEventStreamGenerator(int eventsPerSeconds, long cyclePaddingMs)
+  {
+    super(eventsPerSeconds, cyclePaddingMs);
+  }
+
+  @Override
+  Object getEvent(int i, DateTime timestamp)
+  {
+    Map<String, Object> event = new HashMap<>();
+    event.put("page", "Gypsy Danger");
+    event.put("language", "en");
+    event.put("user", "nuclear");
+    event.put("unpatrolled", "true");
+    event.put("newPage", "true");
+    event.put("robot", "false");
+    event.put("anonymous", "false");
+    event.put("namespace", "article");
+    event.put("continent", "North America");
+    event.put("country", "United States");
+    event.put("region", "Bay Area");
+    event.put("city", "San Francisco");
+    event.put("timestamp", DATE_TIME_FORMATTER.print(timestamp));
+    event.put("added", i);
+    event.put("deleted", 0);
+    event.put("delta", i);
+    return event;
+  }
+}
diff --git a/integration-tests/src/main/java/org/testng/DruidTestRunnerFactory.java b/integration-tests/src/main/java/org/testng/DruidTestRunnerFactory.java
deleted file mode 100644
index d6b2448..0000000
--- a/integration-tests/src/main/java/org/testng/DruidTestRunnerFactory.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package /*CHECKSTYLE.OFF: PackageName*/org.testng/*CHECKSTYLE.ON: PackageName*/;
-
-import com.google.inject.Injector;
-import com.google.inject.Key;
-import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.common.lifecycle.Lifecycle;
-import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.java.util.http.client.HttpClient;
-import org.apache.druid.java.util.http.client.Request;
-import org.apache.druid.java.util.http.client.response.StatusResponseHandler;
-import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
-import org.apache.druid.testing.IntegrationTestingConfig;
-import org.apache.druid.testing.guice.DruidTestModuleFactory;
-import org.apache.druid.testing.guice.TestClient;
-import org.apache.druid.testing.utils.ITRetryUtil;
-import org.jboss.netty.handler.codec.http.HttpMethod;
-import org.jboss.netty.handler.codec.http.HttpResponseStatus;
-import org.testng.internal.IConfiguration;
-import org.testng.internal.annotations.IAnnotationFinder;
-import org.testng.xml.XmlTest;
-
-import java.net.URL;
-import java.util.List;
-
-/**
- * This class must be in package org.testng to access protected methods like TestNG.getDefault().getConfiguration()
- */
-public class DruidTestRunnerFactory implements ITestRunnerFactory
-{
-  private static final Logger LOG = new Logger(DruidTestRunnerFactory.class);
-
-  @Override
-  public TestRunner newTestRunner(ISuite suite, XmlTest test, List<IInvokedMethodListener> listeners)
-  {
-    IConfiguration configuration = TestNG.getDefault().getConfiguration();
-    String outputDirectory = suite.getOutputDirectory();
-    IAnnotationFinder annotationFinder = configuration.getAnnotationFinder();
-    Boolean skipFailedInvocationCounts = suite.getXmlSuite().skipFailedInvocationCounts();
-    return new DruidTestRunner(
-      configuration,
-      suite,
-      test,
-      outputDirectory,
-      annotationFinder,
-      skipFailedInvocationCounts,
-      listeners
-    );
-  }
-
-  private static class DruidTestRunner extends TestRunner
-  {
-
-    protected DruidTestRunner(
-        IConfiguration configuration,
-        ISuite suite,
-        XmlTest test,
-        String outputDirectory,
-        IAnnotationFinder finder,
-        boolean skipFailedInvocationCounts,
-        List<IInvokedMethodListener> invokedMethodListeners
-    )
-    {
-      super(configuration, suite, test, outputDirectory, finder, skipFailedInvocationCounts, invokedMethodListeners);
-    }
-
-    @Override
-    public void run()
-    {
-      Injector injector = DruidTestModuleFactory.getInjector();
-      IntegrationTestingConfig config = injector.getInstance(IntegrationTestingConfig.class);
-      HttpClient client = injector.getInstance(Key.get(HttpClient.class, TestClient.class));
-
-      waitUntilInstanceReady(client, config.getCoordinatorUrl());
-      waitUntilInstanceReady(client, config.getIndexerUrl());
-      waitUntilInstanceReady(client, config.getBrokerUrl());
-      String routerHost = config.getRouterUrl();
-      if (null != routerHost) {
-        waitUntilInstanceReady(client, config.getRouterUrl());
-      }
-      Lifecycle lifecycle = injector.getInstance(Lifecycle.class);
-      try {
-        lifecycle.start();
-        runTests();
-      }
-      catch (Exception e) {
-        LOG.error(e, "");
-        throw new RuntimeException(e);
-      }
-      finally {
-        lifecycle.stop();
-      }
-
-    }
-
-    private void runTests()
-    {
-      super.run();
-    }
-
-    public void waitUntilInstanceReady(final HttpClient client, final String host)
-    {
-      ITRetryUtil.retryUntilTrue(
-          () -> {
-            try {
-              StatusResponseHolder response = client.go(
-                  new Request(HttpMethod.GET, new URL(StringUtils.format("%s/status/health", host))),
-                  StatusResponseHandler.getInstance()
-              ).get();
-
-              LOG.info("%s %s", response.getStatus(), response.getContent());
-              return response.getStatus().equals(HttpResponseStatus.OK);
-            }
-            catch (Throwable e) {
-              LOG.error(e, "");
-              return false;
-            }
-          },
-          "Waiting for instance to be ready: [" + host + "]"
-      );
-    }
-  }
-}
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java b/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java
index 79be37b..9152228 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java
@@ -26,12 +26,26 @@ package org.apache.druid.tests;
 public class TestNGGroup
 {
   public static final String BATCH_INDEX = "batch-index";
+
   public static final String HADOOP_INDEX = "hadoop-index";
+
   public static final String KAFKA_INDEX = "kafka-index";
+
+  public static final String KAFKA_INDEX_SLOW = "kafka-index-slow";
+
+  public static final String TRANSACTIONAL_KAFKA_INDEX = "kafka-transactional-index";
+
+  public static final String TRANSACTIONAL_KAFKA_INDEX_SLOW = "kafka-transactional-index-slow";
+
   public static final String OTHER_INDEX = "other-index";
+
   public static final String PERFECT_ROLLUP_PARALLEL_BATCH_INDEX = "perfect-rollup-parallel-batch-index";
-  // This group can only be run individually using -Dgroups=query since it requires specific test data setup.
+
+  /**
+   * This group can only be run individually using -Dgroups=query since it requires specific test data setup.
+   */
   public static final String QUERY = "query";
+
   public static final String REALTIME_INDEX = "realtime-index";
   // This group can only be run individually using -Dgroups=security since it requires specific test data setup.
   public static final String SECURITY = "security";
@@ -58,4 +72,12 @@ public class TestNGGroup
   // The path of the file must then be pass to mvn with -Doverride.config.path=<PATH_TO_FILE>
   // See integration-tests/docker/environment-configs/override-examples/s3 for env vars to provide.
   public static final String S3_INGESTION = "s3-ingestion";
+
+  /**
+   * This group is not part of CI. To run this group, AWS kinesis configs/credentials for your AWS kinesis must be
+   * provided in a file. The path of the file must then be pass to mvn with -Doverride.config.path=<PATH_TO_FILE>
+   * See integration-tests/docker/environment-configs/override-examples/kinesis for env vars to provide.
+   * Kinesis stream endpoint for a region must also be pass to mvn with -Ddruid.test.config.streamEndpoint=<ENDPOINT>
+   */
+  public static final String KINESIS_INDEX = "kinesis-index";
 }
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexerTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexerTest.java
deleted file mode 100644
index 7ea8bd4..0000000
--- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexerTest.java
+++ /dev/null
@@ -1,313 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.tests.indexer;
-
-import com.google.common.base.Throwables;
-import com.google.inject.Inject;
-import kafka.admin.AdminUtils;
-import kafka.admin.RackAwareMode;
-import kafka.utils.ZKStringSerializer$;
-import kafka.utils.ZkUtils;
-import org.I0Itec.zkclient.ZkClient;
-import org.I0Itec.zkclient.ZkConnection;
-import org.apache.commons.io.IOUtils;
-import org.apache.druid.indexer.TaskIdUtils;
-import org.apache.druid.indexing.kafka.KafkaConsumerConfigs;
-import org.apache.druid.java.util.common.DateTimes;
-import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.testing.IntegrationTestingConfig;
-import org.apache.druid.testing.utils.ITRetryUtil;
-import org.apache.druid.testing.utils.TestQueryHelper;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.joda.time.DateTime;
-import org.joda.time.DateTimeZone;
-import org.joda.time.format.DateTimeFormat;
-import org.joda.time.format.DateTimeFormatter;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.charset.StandardCharsets;
-import java.util.Map;
-import java.util.Properties;
-
-abstract class AbstractKafkaIndexerTest extends AbstractIndexerTest
-{
-  private static final Logger LOG = new Logger(AbstractKafkaIndexerTest.class);
-  protected static final String INDEXER_FILE_LEGACY_PARSER = "/indexer/kafka_supervisor_spec_legacy_parser.json";
-  protected static final String INDEXER_FILE_INPUT_FORMAT = "/indexer/kafka_supervisor_spec_input_format.json";
-  private static final String QUERIES_FILE = "/indexer/kafka_index_queries.json";
-  private static final String TOPIC_NAME = "kafka_indexing_service_topic";
-
-  private static final int NUM_EVENTS_TO_SEND = 60;
-  private static final long WAIT_TIME_MILLIS = 2 * 60 * 1000L;
-  private static final String TEST_PROPERTY_PREFIX = "kafka.test.property.";
-
-  // We'll fill in the current time and numbers for added, deleted and changed
-  // before sending the event.
-  private static final String EVENT_TEMPLATE =
-      "{\"timestamp\": \"%s\"," +
-      "\"page\": \"Gypsy Danger\"," +
-      "\"language\" : \"en\"," +
-      "\"user\" : \"nuclear\"," +
-      "\"unpatrolled\" : \"true\"," +
-      "\"newPage\" : \"true\"," +
-      "\"robot\": \"false\"," +
-      "\"anonymous\": \"false\"," +
-      "\"namespace\":\"article\"," +
-      "\"continent\":\"North America\"," +
-      "\"country\":\"United States\"," +
-      "\"region\":\"Bay Area\"," +
-      "\"city\":\"San Francisco\"," +
-      "\"added\":%d," +
-      "\"deleted\":%d," +
-      "\"delta\":%d}";
-
-  private ZkUtils zkUtils;
-  private boolean segmentsExist;   // to tell if we should remove segments during teardown
-
-  // format for the querying interval
-  private static final DateTimeFormatter INTERVAL_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:'00Z'");
-  // format for the expected timestamp in a query response
-  private static final DateTimeFormatter TIMESTAMP_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss'.000Z'");
-
-  @Inject
-  private TestQueryHelper queryHelper;
-  @Inject
-  private IntegrationTestingConfig config;
-
-  private String fullDatasourceName;
-
-  void doKafkaIndexTest(String dataSourceName, String supervisorSpecPath, boolean txnEnabled)
-  {
-    fullDatasourceName = dataSourceName + config.getExtraDatasourceNameSuffix();
-    // create topic
-    try {
-      int sessionTimeoutMs = 10000;
-      int connectionTimeoutMs = 10000;
-      String zkHosts = config.getZookeeperHosts();
-      ZkClient zkClient = new ZkClient(zkHosts, sessionTimeoutMs, connectionTimeoutMs, ZKStringSerializer$.MODULE$);
-      zkUtils = new ZkUtils(zkClient, new ZkConnection(zkHosts, sessionTimeoutMs), false);
-      if (config.manageKafkaTopic()) {
-        int numPartitions = 4;
-        int replicationFactor = 1;
-        Properties topicConfig = new Properties();
-        AdminUtils.createTopic(
-            zkUtils,
-            TOPIC_NAME,
-            numPartitions,
-            replicationFactor,
-            topicConfig,
-            RackAwareMode.Disabled$.MODULE$
-        );
-      }
-    }
-    catch (Exception e) {
-      throw new ISE(e, "could not create kafka topic");
-    }
-
-    String spec;
-    try {
-      LOG.info("supervisorSpec name: [%s]", supervisorSpecPath);
-      final Map<String, Object> consumerConfigs = KafkaConsumerConfigs.getConsumerProperties();
-      final Properties consumerProperties = new Properties();
-      consumerProperties.putAll(consumerConfigs);
-      consumerProperties.setProperty("bootstrap.servers", config.getKafkaInternalHost());
-
-      spec = getResourceAsString(supervisorSpecPath);
-      spec = StringUtils.replace(spec, "%%DATASOURCE%%", fullDatasourceName);
-      spec = StringUtils.replace(spec, "%%TOPIC%%", TOPIC_NAME);
-      spec = StringUtils.replace(spec, "%%CONSUMER_PROPERTIES%%", jsonMapper.writeValueAsString(consumerProperties));
-      LOG.info("supervisorSpec: [%s]\n", spec);
-    }
-    catch (Exception e) {
-      LOG.error("could not read file [%s]", supervisorSpecPath);
-      throw new ISE(e, "could not read file [%s]", supervisorSpecPath);
-    }
-
-    // start supervisor
-    String supervisorId = indexer.submitSupervisor(spec);
-    LOG.info("Submitted supervisor");
-
-    // set up kafka producer
-    Properties properties = new Properties();
-    addFilteredProperties(config, properties);
-    properties.setProperty("bootstrap.servers", config.getKafkaHost());
-    LOG.info("Kafka bootstrap.servers: [%s]", config.getKafkaHost());
-    properties.setProperty("acks", "all");
-    properties.setProperty("retries", "3");
-    properties.setProperty("key.serializer", ByteArraySerializer.class.getName());
-    properties.setProperty("value.serializer", ByteArraySerializer.class.getName());
-    if (txnEnabled) {
-      properties.setProperty("enable.idempotence", "true");
-      properties.setProperty("transactional.id", TaskIdUtils.getRandomId());
-    }
-
-    KafkaProducer<String, String> producer = new KafkaProducer<>(
-        properties,
-        new StringSerializer(),
-        new StringSerializer()
-    );
-
-    DateTimeZone zone = DateTimes.inferTzFromString("UTC");
-    // format for putting into events
-    DateTimeFormatter event_fmt = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss'Z'");
-
-    DateTime dt = new DateTime(zone); // timestamp to put on events
-    DateTime dtFirst = dt;            // timestamp of 1st event
-    DateTime dtLast = dt;             // timestamp of last event
-
-    // these are used to compute the expected aggregations
-    int added = 0;
-    int num_events = 0;
-
-    // send data to kafka
-    if (txnEnabled) {
-      producer.initTransactions();
-      producer.beginTransaction();
-    }
-    while (num_events < NUM_EVENTS_TO_SEND) {
-      num_events++;
-      added += num_events;
-      // construct the event to send
-      String event = StringUtils.format(EVENT_TEMPLATE, event_fmt.print(dt), num_events, 0, num_events);
-      LOG.info("sending event: [%s]", event);
-      try {
-
-        producer.send(new ProducerRecord<>(TOPIC_NAME, event)).get();
-
-      }
-      catch (Exception ioe) {
-        throw Throwables.propagate(ioe);
-      }
-
-      dtLast = dt;
-      dt = new DateTime(zone);
-    }
-    if (txnEnabled) {
-      producer.commitTransaction();
-    }
-    producer.close();
-
-    LOG.info("Waiting for [%s] millis for Kafka indexing tasks to consume events", WAIT_TIME_MILLIS);
-    try {
-      Thread.sleep(WAIT_TIME_MILLIS);
-    }
-    catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw new RuntimeException(e);
-    }
-
-    InputStream is = AbstractKafkaIndexerTest.class.getResourceAsStream(QUERIES_FILE);
-    if (null == is) {
-      throw new ISE("could not open query file: %s", QUERIES_FILE);
-    }
-
-    // put the timestamps into the query structure
-    String query_response_template;
-    try {
-      query_response_template = IOUtils.toString(is, StandardCharsets.UTF_8);
-    }
-    catch (IOException e) {
-      throw new ISE(e, "could not read query file: %s", QUERIES_FILE);
-    }
-
-    String queryStr = query_response_template;
-    queryStr = StringUtils.replace(queryStr, "%%DATASOURCE%%", fullDatasourceName);
-    queryStr = StringUtils.replace(queryStr, "%%TIMEBOUNDARY_RESPONSE_TIMESTAMP%%", TIMESTAMP_FMT.print(dtFirst));
-    queryStr = StringUtils.replace(queryStr, "%%TIMEBOUNDARY_RESPONSE_MAXTIME%%", TIMESTAMP_FMT.print(dtLast));
-    queryStr = StringUtils.replace(queryStr, "%%TIMEBOUNDARY_RESPONSE_MINTIME%%", TIMESTAMP_FMT.print(dtFirst));
-    queryStr = StringUtils.replace(queryStr, "%%TIMESERIES_QUERY_START%%", INTERVAL_FMT.print(dtFirst));
-    queryStr = StringUtils.replace(queryStr, "%%TIMESERIES_QUERY_END%%", INTERVAL_FMT.print(dtLast.plusMinutes(2)));
-    queryStr = StringUtils.replace(queryStr, "%%TIMESERIES_RESPONSE_TIMESTAMP%%", TIMESTAMP_FMT.print(dtFirst));
-    queryStr = StringUtils.replace(queryStr, "%%TIMESERIES_ADDED%%", Integer.toString(added));
-    queryStr = StringUtils.replace(queryStr, "%%TIMESERIES_NUMEVENTS%%", Integer.toString(num_events));
-
-    // this query will probably be answered from the indexing tasks but possibly from 2 historical segments / 2 indexing
-    try {
-      this.queryHelper.testQueriesFromString(queryStr, 2);
-    }
-    catch (Exception e) {
-      throw Throwables.propagate(e);
-    }
-
-    LOG.info("Shutting down Kafka Supervisor");
-    indexer.shutdownSupervisor(supervisorId);
-
-    // wait for all kafka indexing tasks to finish
-    LOG.info("Waiting for all kafka indexing tasks to finish");
-    ITRetryUtil.retryUntilTrue(
-        () -> (indexer.getPendingTasks().size()
-               + indexer.getRunningTasks().size()
-               + indexer.getWaitingTasks().size()) == 0,
-        "Waiting for Tasks Completion"
-    );
-
-    // wait for segments to be handed off
-    try {
-      ITRetryUtil.retryUntil(
-          () -> coordinator.areSegmentsLoaded(fullDatasourceName),
-          true,
-          10000,
-          30,
-          "Real-time generated segments loaded"
-      );
-    }
-    catch (Exception e) {
-      throw Throwables.propagate(e);
-    }
-    LOG.info("segments are present");
-    segmentsExist = true;
-
-    // this query will be answered by at least 1 historical segment, most likely 2, and possibly up to all 4
-    try {
-      this.queryHelper.testQueriesFromString(queryStr, 2);
-    }
-    catch (Exception e) {
-      throw Throwables.propagate(e);
-    }
-  }
-  
-  private void addFilteredProperties(IntegrationTestingConfig config, Properties properties)
-  {
-    for (Map.Entry<String, String> entry : config.getProperties().entrySet()) {
-      if (entry.getKey().startsWith(TEST_PROPERTY_PREFIX)) {
-        properties.setProperty(entry.getKey().substring(TEST_PROPERTY_PREFIX.length()), entry.getValue());
-      }
-    }
-  }
-
-  void doTearDown()
-  {
-    if (config.manageKafkaTopic()) {
-      // delete kafka topic
-      AdminUtils.deleteTopic(zkUtils, TOPIC_NAME);
-    }
-
-    // remove segments
-    if (segmentsExist && fullDatasourceName != null) {
-      unloadAndKillData(fullDatasourceName);
-    }
-  }
-}
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java
new file mode 100644
index 0000000..ce769bf
--- /dev/null
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.tests.indexer;
+
+import org.apache.druid.indexing.kafka.KafkaConsumerConfigs;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.testing.IntegrationTestingConfig;
+import org.apache.druid.testing.utils.KafkaAdminClient;
+import org.apache.druid.testing.utils.KafkaEventWriter;
+import org.apache.druid.testing.utils.StreamAdminClient;
+import org.apache.druid.testing.utils.StreamEventWriter;
+
+import java.util.Map;
+import java.util.Properties;
+import java.util.function.Function;
+
+public abstract class AbstractKafkaIndexingServiceTest extends AbstractStreamIndexingTest
+{
+  protected abstract boolean isKafkaWriterTransactionalEnabled();
+
+  @Override
+  StreamAdminClient createStreamAdminClient(IntegrationTestingConfig config)
+  {
+    return new KafkaAdminClient(config.getKafkaHost());
+  }
+
+  @Override
+  public StreamEventWriter createStreamEventWriter(IntegrationTestingConfig config)
+  {
+    return new KafkaEventWriter(config, isKafkaWriterTransactionalEnabled());
+  }
+
+  @Override
+  Function<String, String> generateStreamIngestionPropsTransform(String streamName,
+                                                                 String fullDatasourceName,
+                                                                 IntegrationTestingConfig config)
+  {
+    final Map<String, Object> consumerConfigs = KafkaConsumerConfigs.getConsumerProperties();
+    final Properties consumerProperties = new Properties();
+    consumerProperties.putAll(consumerConfigs);
+    consumerProperties.setProperty("bootstrap.servers", config.getKafkaInternalHost());
+    return spec -> {
+      try {
+        spec = StringUtils.replace(
+            spec,
+            "%%DATASOURCE%%",
+            fullDatasourceName
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%STREAM_TYPE%%",
+            "kafka"
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TOPIC_KEY%%",
+            "topic"
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TOPIC_VALUE%%",
+            streamName
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%USE_EARLIEST_KEY%%",
+            "useEarliestOffset"
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%STREAM_PROPERTIES_KEY%%",
+            "consumerProperties"
+        );
+        return StringUtils.replace(
+            spec,
+            "%%STREAM_PROPERTIES_VALUE%%",
+            jsonMapper.writeValueAsString(consumerProperties)
+        );
+      }
+      catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    };
+  }
+
+  @Override
+  Function<String, String> generateStreamQueryPropsTransform(String streamName, String fullDatasourceName)
+  {
+    return spec -> {
+      try {
+        spec = StringUtils.replace(
+            spec,
+            "%%DATASOURCE%%",
+            fullDatasourceName
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMEBOUNDARY_RESPONSE_TIMESTAMP%%",
+            TIMESTAMP_FMT.print(FIRST_EVENT_TIME)
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMEBOUNDARY_RESPONSE_MAXTIME%%",
+            TIMESTAMP_FMT.print(FIRST_EVENT_TIME.plusSeconds(TOTAL_NUMBER_OF_SECOND - 1))
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMEBOUNDARY_RESPONSE_MINTIME%%",
+            TIMESTAMP_FMT.print(FIRST_EVENT_TIME)
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMESERIES_QUERY_START%%",
+            INTERVAL_FMT.print(FIRST_EVENT_TIME)
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMESERIES_QUERY_END%%",
+            INTERVAL_FMT.print(FIRST_EVENT_TIME.plusSeconds(TOTAL_NUMBER_OF_SECOND - 1).plusMinutes(2))
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMESERIES_RESPONSE_TIMESTAMP%%",
+            TIMESTAMP_FMT.print(FIRST_EVENT_TIME)
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMESERIES_ADDED%%",
+            Long.toString(getSumOfEventSequence(EVENTS_PER_SECOND) * TOTAL_NUMBER_OF_SECOND)
+        );
+        return StringUtils.replace(
+            spec,
+            "%%TIMESERIES_NUMEVENTS%%",
+            Integer.toString(EVENTS_PER_SECOND * TOTAL_NUMBER_OF_SECOND)
+        );
+      }
+      catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    };
+  }
+}
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKinesisIndexingServiceTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKinesisIndexingServiceTest.java
new file mode 100644
index 0000000..14c9cac
--- /dev/null
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKinesisIndexingServiceTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.tests.indexer;
+
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.testing.IntegrationTestingConfig;
+import org.apache.druid.testing.utils.KinesisAdminClient;
+import org.apache.druid.testing.utils.KinesisEventWriter;
+import org.apache.druid.testing.utils.StreamAdminClient;
+import org.apache.druid.testing.utils.StreamEventWriter;
+
+import java.util.function.Function;
+
+public abstract class AbstractKinesisIndexingServiceTest extends AbstractStreamIndexingTest
+{
+  @Override
+  StreamAdminClient createStreamAdminClient(IntegrationTestingConfig config) throws Exception
+  {
+    return new KinesisAdminClient(config.getStreamEndpoint());
+  }
+
+  @Override
+  StreamEventWriter createStreamEventWriter(IntegrationTestingConfig config) throws Exception
+  {
+    return new KinesisEventWriter(config.getStreamEndpoint(), false);
+  }
+
+  @Override
+  Function<String, String> generateStreamIngestionPropsTransform(String streamName,
+                                                                 String fullDatasourceName,
+                                                                 IntegrationTestingConfig config)
+  {
+    return spec -> {
+      try {
+        spec = StringUtils.replace(
+            spec,
+            "%%DATASOURCE%%",
+            fullDatasourceName
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%STREAM_TYPE%%",
+            "kinesis"
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TOPIC_KEY%%",
+            "stream"
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TOPIC_VALUE%%",
+            streamName
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%USE_EARLIEST_KEY%%",
+            "useEarliestSequenceNumber"
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%STREAM_PROPERTIES_KEY%%",
+            "endpoint"
+        );
+        return StringUtils.replace(
+            spec,
+            "%%STREAM_PROPERTIES_VALUE%%",
+            jsonMapper.writeValueAsString(config.getStreamEndpoint())
+        );
+      }
+      catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    };
+  }
+
+  @Override
+  Function<String, String> generateStreamQueryPropsTransform(String streamName, String fullDatasourceName)
+  {
+    return spec -> {
+      try {
+        spec = StringUtils.replace(
+            spec,
+            "%%DATASOURCE%%",
+            fullDatasourceName
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMEBOUNDARY_RESPONSE_TIMESTAMP%%",
+            TIMESTAMP_FMT.print(FIRST_EVENT_TIME)
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMEBOUNDARY_RESPONSE_MAXTIME%%",
+            TIMESTAMP_FMT.print(FIRST_EVENT_TIME.plusSeconds(TOTAL_NUMBER_OF_SECOND - 1))
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMEBOUNDARY_RESPONSE_MINTIME%%",
+            TIMESTAMP_FMT.print(FIRST_EVENT_TIME)
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMESERIES_QUERY_START%%",
+            INTERVAL_FMT.print(FIRST_EVENT_TIME)
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMESERIES_QUERY_END%%",
+            INTERVAL_FMT.print(FIRST_EVENT_TIME.plusSeconds(TOTAL_NUMBER_OF_SECOND - 1).plusMinutes(2))
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMESERIES_RESPONSE_TIMESTAMP%%",
+            TIMESTAMP_FMT.print(FIRST_EVENT_TIME)
+        );
+        spec = StringUtils.replace(
+            spec,
+            "%%TIMESERIES_ADDED%%",
+            Long.toString(getSumOfEventSequence(EVENTS_PER_SECOND) * TOTAL_NUMBER_OF_SECOND)
+        );
+        return StringUtils.replace(
+            spec,
+            "%%TIMESERIES_NUMEVENTS%%",
+            Integer.toString(EVENTS_PER_SECOND * TOTAL_NUMBER_OF_SECOND)
+        );
+      }
+      catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    };
+  }
+}
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
new file mode 100644
index 0000000..2f0c65a
--- /dev/null
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
@@ -0,0 +1,450 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.tests.indexer;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.Inject;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.testing.IntegrationTestingConfig;
+import org.apache.druid.testing.utils.DruidClusterAdminClient;
+import org.apache.druid.testing.utils.ITRetryUtil;
+import org.apache.druid.testing.utils.StreamAdminClient;
+import org.apache.druid.testing.utils.StreamEventWriter;
+import org.apache.druid.testing.utils.WikipediaStreamEventStreamGenerator;
+import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+import java.io.Closeable;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.Function;
+
+public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
+{
+  static final DateTime FIRST_EVENT_TIME = DateTimes.of(1994, 4, 29, 1, 0);
+  // format for the querying interval
+  static final DateTimeFormatter INTERVAL_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:'00Z'");
+  // format for the expected timestamp in a query response
+  static final DateTimeFormatter TIMESTAMP_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss'.000Z'");
+  static final int EVENTS_PER_SECOND = 6;
+  static final int TOTAL_NUMBER_OF_SECOND = 10;
+  // Since this integration test can terminates or be killed un-expectedly, this tag is added to all streams created
+  // to help make stream clean up easier. (Normally, streams should be cleanup automattically by the teardown method)
+  // The value to this tag is a timestamp that can be used by a lambda function to remove unused stream.
+  private static final String STREAM_EXPIRE_TAG = "druid-ci-expire-after";
+  private static final int STREAM_SHARD_COUNT = 2;
+  private static final long WAIT_TIME_MILLIS = 3 * 60 * 1000L;
+  private static final String INDEXER_FILE_LEGACY_PARSER = "/indexer/stream_supervisor_spec_legacy_parser.json";
+  private static final String INDEXER_FILE_INPUT_FORMAT = "/indexer/stream_supervisor_spec_input_format.json";
+  private static final String QUERIES_FILE = "/indexer/stream_index_queries.json";
+  private static final long CYCLE_PADDING_MS = 100;
+  private static final Logger LOG = new Logger(AbstractStreamIndexingTest.class);
+
+  @Inject
+  private DruidClusterAdminClient druidClusterAdminClient;
+
+  @Inject
+  private IntegrationTestingConfig config;
+
+  private StreamAdminClient streamAdminClient;
+  private WikipediaStreamEventStreamGenerator wikipediaStreamEventGenerator;
+
+  abstract StreamAdminClient createStreamAdminClient(IntegrationTestingConfig config) throws Exception;
+  abstract StreamEventWriter createStreamEventWriter(IntegrationTestingConfig config) throws Exception;
+  abstract Function<String, String> generateStreamIngestionPropsTransform(String streamName,
+                                                                          String fullDatasourceName,
+                                                                          IntegrationTestingConfig config);
+  abstract Function<String, String> generateStreamQueryPropsTransform(String streamName, String fullDatasourceName);
+  public abstract String getTestNamePrefix();
+
+  protected void doBeforeClass() throws Exception
+  {
+    streamAdminClient = createStreamAdminClient(config);
+    wikipediaStreamEventGenerator = new WikipediaStreamEventStreamGenerator(EVENTS_PER_SECOND, CYCLE_PADDING_MS);
+  }
+
+  protected void doClassTeardown()
+  {
+    wikipediaStreamEventGenerator.shutdown();
+  }
+
+  protected void doTestIndexDataWithLegacyParserStableState() throws Exception
+  {
+    StreamEventWriter streamEventWriter = createStreamEventWriter(config);
+    final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig();
+    try (
+        final Closeable ignored1 = unloader(generatedTestConfig.getFullDatasourceName())
+    ) {
+      final String taskSpec = generatedTestConfig.getStreamIngestionPropsTransform().apply(getResourceAsString(INDEXER_FILE_LEGACY_PARSER));
+      LOG.info("supervisorSpec: [%s]\n", taskSpec);
+      // Start supervisor
+      generatedTestConfig.setSupervisorId(indexer.submitSupervisor(taskSpec));
+      LOG.info("Submitted supervisor");
+      // Start data generator
+      wikipediaStreamEventGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, TOTAL_NUMBER_OF_SECOND, FIRST_EVENT_TIME);
+      verifyIngestedData(generatedTestConfig);
+    }
+    finally {
+      doMethodTeardown(generatedTestConfig, streamEventWriter);
+    }
+  }
+
+  protected void doTestIndexDataWithInputFormatStableState() throws Exception
+  {
+    StreamEventWriter streamEventWriter = createStreamEventWriter(config);
+    final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig();
+    try (
+        final Closeable ignored1 = unloader(generatedTestConfig.getFullDatasourceName())
+    ) {
+      final String taskSpec = generatedTestConfig.getStreamIngestionPropsTransform().apply(getResourceAsString(INDEXER_FILE_INPUT_FORMAT));
+      LOG.info("supervisorSpec: [%s]\n", taskSpec);
+      // Start supervisor
+      generatedTestConfig.setSupervisorId(indexer.submitSupervisor(taskSpec));
+      LOG.info("Submitted supervisor");
+      // Start data generator
+      wikipediaStreamEventGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, TOTAL_NUMBER_OF_SECOND, FIRST_EVENT_TIME);
+      verifyIngestedData(generatedTestConfig);
+    }
+    finally {
+      doMethodTeardown(generatedTestConfig, streamEventWriter);
+    }
+  }
+
+  void doTestIndexDataWithLosingCoordinator() throws Exception
+  {
+    testIndexWithLosingNodeHelper(() -> druidClusterAdminClient.restartCoordinatorContainer(), () -> druidClusterAdminClient.waitUntilCoordinatorReady());
+  }
+
+  void doTestIndexDataWithLosingOverlord() throws Exception
+  {
+    testIndexWithLosingNodeHelper(() -> druidClusterAdminClient.restartIndexerContainer(), () -> druidClusterAdminClient.waitUntilIndexerReady());
+  }
+
+  void doTestIndexDataWithLosingHistorical() throws Exception
+  {
+    testIndexWithLosingNodeHelper(() -> druidClusterAdminClient.restartHistoricalContainer(), () -> druidClusterAdminClient.waitUntilHistoricalReady());
+  }
+
+  protected void doTestIndexDataWithStartStopSupervisor() throws Exception
+  {
+    StreamEventWriter streamEventWriter = createStreamEventWriter(config);
+    final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig();
+    try (
+        final Closeable ignored1 = unloader(generatedTestConfig.getFullDatasourceName())
+    ) {
+      final String taskSpec = generatedTestConfig.getStreamIngestionPropsTransform().apply(getResourceAsString(INDEXER_FILE_INPUT_FORMAT));
+      LOG.info("supervisorSpec: [%s]\n", taskSpec);
+      // Start supervisor
+      generatedTestConfig.setSupervisorId(indexer.submitSupervisor(taskSpec));
+      LOG.info("Submitted supervisor");
+      // Start generating half of the data
+      int secondsToGenerateRemaining = TOTAL_NUMBER_OF_SECOND;
+      int secondsToGenerateFirstRound = TOTAL_NUMBER_OF_SECOND / 2;
+      secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateFirstRound;
+      wikipediaStreamEventGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, secondsToGenerateFirstRound, FIRST_EVENT_TIME);
+      // Verify supervisor is healthy before suspension
+      ITRetryUtil.retryUntil(
+          () -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(generatedTestConfig.getSupervisorId())),
+          true,
+          10000,
+          30,
+          "Waiting for supervisor to be healthy"
+      );
+      // Suspend the supervisor
+      indexer.suspendSupervisor(generatedTestConfig.getSupervisorId());
+      // Start generating remainning half of the data
+      wikipediaStreamEventGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, secondsToGenerateRemaining, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound));
+      // Resume the supervisor
+      indexer.resumeSupervisor(generatedTestConfig.getSupervisorId());
+      // Verify supervisor is healthy after suspension
+      ITRetryUtil.retryUntil(
+          () -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(generatedTestConfig.getSupervisorId())),
+          true,
+          10000,
+          30,
+          "Waiting for supervisor to be healthy"
+      );
+      // Verify that supervisor can catch up with the stream
+      verifyIngestedData(generatedTestConfig);
+    }
+    finally {
+      doMethodTeardown(generatedTestConfig, streamEventWriter);
+    }
+  }
+
+  protected void doTestIndexDataWithStreamReshardSplit() throws Exception
+  {
+    // Reshard the stream from STREAM_SHARD_COUNT to STREAM_SHARD_COUNT * 2
+    testIndexWithStreamReshardHelper(STREAM_SHARD_COUNT * 2);
+  }
+
+  protected void doTestIndexDataWithStreamReshardMerge() throws Exception
+  {
+    // Reshard the stream from STREAM_SHARD_COUNT to STREAM_SHARD_COUNT / 2
+    testIndexWithStreamReshardHelper(STREAM_SHARD_COUNT / 2);
+  }
+
+  private void testIndexWithLosingNodeHelper(Runnable restartRunnable, Runnable waitForReadyRunnable) throws Exception
+  {
+    StreamEventWriter streamEventWriter = createStreamEventWriter(config);
+    final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig();
+    try (
+        final Closeable ignored1 = unloader(generatedTestConfig.getFullDatasourceName())
+    ) {
+      final String taskSpec = generatedTestConfig.getStreamIngestionPropsTransform().apply(getResourceAsString(INDEXER_FILE_INPUT_FORMAT));
+      LOG.info("supervisorSpec: [%s]\n", taskSpec);
+      // Start supervisor
+      generatedTestConfig.setSupervisorId(indexer.submitSupervisor(taskSpec));
+      LOG.info("Submitted supervisor");
+      // Start generating one third of the data (before restarting)
+      int secondsToGenerateRemaining = TOTAL_NUMBER_OF_SECOND;
+      int secondsToGenerateFirstRound = TOTAL_NUMBER_OF_SECOND / 3;
+      secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateFirstRound;
+      wikipediaStreamEventGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, secondsToGenerateFirstRound, FIRST_EVENT_TIME);
+      // Verify supervisor is healthy before restart
+      ITRetryUtil.retryUntil(
+          () -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(generatedTestConfig.getSupervisorId())),
+          true,
+          10000,
+          30,
+          "Waiting for supervisor to be healthy"
+      );
+      // Restart Druid process
+      LOG.info("Restarting Druid process");
+      restartRunnable.run();
+      LOG.info("Restarted Druid process");
+      // Start generating one third of the data (while restarting)
+      int secondsToGenerateSecondRound = TOTAL_NUMBER_OF_SECOND / 3;
+      secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateSecondRound;
+      wikipediaStreamEventGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, secondsToGenerateSecondRound, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound));
+      // Wait for Druid process to be available
+      LOG.info("Waiting for Druid process to be available");
+      waitForReadyRunnable.run();
+      LOG.info("Druid process is now available");
+      // Start generating remaining data (after restarting)
+      wikipediaStreamEventGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, secondsToGenerateRemaining, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound + secondsToGenerateSecondRound));
+      // Verify supervisor is healthy
+      ITRetryUtil.retryUntil(
+          () -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(generatedTestConfig.getSupervisorId())),
+          true,
+          10000,
+          30,
+          "Waiting for supervisor to be healthy"
+      );
+      // Verify that supervisor ingested all data
+      verifyIngestedData(generatedTestConfig);
+    }
+    finally {
+      doMethodTeardown(generatedTestConfig, streamEventWriter);
+    }
+  }
+
+  private void testIndexWithStreamReshardHelper(int newShardCount) throws Exception
+  {
+    StreamEventWriter streamEventWriter = createStreamEventWriter(config);
+    final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig();
+    try (
+        final Closeable ignored1 = unloader(generatedTestConfig.getFullDatasourceName())
+    ) {
+      final String taskSpec = generatedTestConfig.getStreamIngestionPropsTransform().apply(getResourceAsString(INDEXER_FILE_INPUT_FORMAT));
+      LOG.info("supervisorSpec: [%s]\n", taskSpec);
+      // Start supervisor
+      generatedTestConfig.setSupervisorId(indexer.submitSupervisor(taskSpec));
+      LOG.info("Submitted supervisor");
+      // Start generating one third of the data (before resharding)
+      int secondsToGenerateRemaining = TOTAL_NUMBER_OF_SECOND;
+      int secondsToGenerateFirstRound = TOTAL_NUMBER_OF_SECOND / 3;
+      secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateFirstRound;
+      wikipediaStreamEventGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, secondsToGenerateFirstRound, FIRST_EVENT_TIME);
+      // Verify supervisor is healthy before resahrding
+      ITRetryUtil.retryUntil(
+          () -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(generatedTestConfig.getSupervisorId())),
+          true,
+          10000,
+          30,
+          "Waiting for supervisor to be healthy"
+      );
+      // Reshard the supervisor by split from STREAM_SHARD_COUNT to newShardCount and waits until the resharding starts
+      streamAdminClient.updatePartitionCount(generatedTestConfig.getStreamName(), newShardCount, true);
+      // Start generating one third of the data (while resharding)
+      int secondsToGenerateSecondRound = TOTAL_NUMBER_OF_SECOND / 3;
+      secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateSecondRound;
+      wikipediaStreamEventGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, secondsToGenerateSecondRound, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound));
+      // Wait for stream to finish resharding
+      ITRetryUtil.retryUntil(
+          () -> streamAdminClient.isStreamActive(generatedTestConfig.getStreamName()),
+          true,
+          10000,
+          30,
+          "Waiting for stream to finish resharding"
+      );
+      ITRetryUtil.retryUntil(
+          () -> streamAdminClient.verfiyPartitionCountUpdated(generatedTestConfig.getStreamName(), STREAM_SHARD_COUNT, newShardCount),
+          true,
+          10000,
+          30,
+          "Waiting for stream to finish resharding"
+      );
+      // Start generating remaining data (after resharding)
+      wikipediaStreamEventGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, secondsToGenerateRemaining, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound + secondsToGenerateSecondRound));
+      // Verify supervisor is healthy after resahrding
+      ITRetryUtil.retryUntil(
+          () -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(generatedTestConfig.getSupervisorId())),
+          true,
+          10000,
+          30,
+          "Waiting for supervisor to be healthy"
+      );
+      // Verify that supervisor can catch up with the stream
+      verifyIngestedData(generatedTestConfig);
+    }
+    finally {
+      doMethodTeardown(generatedTestConfig, streamEventWriter);
+    }
+  }
+
+  private void verifyIngestedData(GeneratedTestConfig generatedTestConfig) throws Exception
+  {
+    // Wait for supervisor to consume events
+    LOG.info("Waiting for [%s] millis for stream indexing tasks to consume events", WAIT_TIME_MILLIS);
+    Thread.sleep(WAIT_TIME_MILLIS);
+    // Query data
+    final String querySpec = generatedTestConfig.getStreamQueryPropsTransform().apply(getResourceAsString(QUERIES_FILE));
+    // this query will probably be answered from the indexing tasks but possibly from 2 historical segments / 2 indexing
+    this.queryHelper.testQueriesFromString(querySpec, 2);
+    LOG.info("Shutting down supervisor");
+    indexer.shutdownSupervisor(generatedTestConfig.getSupervisorId());
+    // wait for all indexing tasks to finish
+    LOG.info("Waiting for all indexing tasks to finish");
+    ITRetryUtil.retryUntilTrue(
+        () -> (indexer.getUncompletedTasksForDataSource(generatedTestConfig.getFullDatasourceName()).size() == 0),
+        "Waiting for Tasks Completion"
+    );
+    // wait for segments to be handed off
+    ITRetryUtil.retryUntil(
+        () -> coordinator.areSegmentsLoaded(generatedTestConfig.getFullDatasourceName()),
+        true,
+        10000,
+        30,
+        "Real-time generated segments loaded"
+    );
+
+    // this query will be answered by at least 1 historical segment, most likely 2, and possibly up to all 4
+    this.queryHelper.testQueriesFromString(querySpec, 2);
+  }
+
+  long getSumOfEventSequence(int numEvents)
+  {
+    return (numEvents * (1 + numEvents)) / 2;
+  }
+
+  private void doMethodTeardown(GeneratedTestConfig generatedTestConfig, StreamEventWriter streamEventWriter)
+  {
+    try {
+      streamEventWriter.flush();
+      streamEventWriter.shutdown();
+    }
+    catch (Exception e) {
+      // Best effort cleanup as the writer may have already been cleanup
+      LOG.warn(e, "Failed to cleanup writer. This might be expected depending on the test method");
+    }
+    try {
+      indexer.shutdownSupervisor(generatedTestConfig.getSupervisorId());
+    }
+    catch (Exception e) {
+      // Best effort cleanup as the supervisor may have already been cleanup
+      LOG.warn(e, "Failed to cleanup supervisor. This might be expected depending on the test method");
+    }
+    try {
+      unloader(generatedTestConfig.getFullDatasourceName());
+    }
+    catch (Exception e) {
+      // Best effort cleanup as the datasource may have already been cleanup
+      LOG.warn(e, "Failed to cleanup datasource. This might be expected depending on the test method");
+    }
+    try {
+      streamAdminClient.deleteStream(generatedTestConfig.getStreamName());
+    }
+    catch (Exception e) {
+      // Best effort cleanup as the stream may have already been cleanup
+      LOG.warn(e, "Failed to cleanup stream. This might be expected depending on the test method");
+    }
+  }
+
+  private class GeneratedTestConfig
+  {
+    private String streamName;
+    private String fullDatasourceName;
+    private String supervisorId;
+    private Function<String, String> streamIngestionPropsTransform;
+    private Function<String, String> streamQueryPropsTransform;
+
+    GeneratedTestConfig() throws Exception
+    {
+      streamName = getTestNamePrefix() + "_index_test_" + UUID.randomUUID();
+      String datasource = getTestNamePrefix() + "_indexing_service_test_" + UUID.randomUUID();
+      Map<String, String> tags = ImmutableMap.of(STREAM_EXPIRE_TAG, Long.toString(DateTimes.nowUtc().plusMinutes(30).getMillis()));
+      streamAdminClient.createStream(streamName, STREAM_SHARD_COUNT, tags);
+      ITRetryUtil.retryUntil(
+          () -> streamAdminClient.isStreamActive(streamName),
+          true,
+          10000,
+          30,
+          "Wait for stream active"
+      );
+      fullDatasourceName = datasource + config.getExtraDatasourceNameSuffix();
+      streamIngestionPropsTransform = generateStreamIngestionPropsTransform(streamName, fullDatasourceName, config);
+      streamQueryPropsTransform = generateStreamQueryPropsTransform(streamName, fullDatasourceName);
+    }
+
+    public String getSupervisorId()
+    {
+      return supervisorId;
+    }
+
+    public void setSupervisorId(String supervisorId)
+    {
+      this.supervisorId = supervisorId;
+    }
+
+    public String getStreamName()
+    {
+      return streamName;
+    }
+
+    public String getFullDatasourceName()
+    {
+      return fullDatasourceName;
+    }
+
+    public Function<String, String> getStreamIngestionPropsTransform()
+    {
+      return streamIngestionPropsTransform;
+    }
+
+    public Function<String, String> getStreamQueryPropsTransform()
+    {
+      return streamQueryPropsTransform;
+    }
+  }
+}
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceNonTransactionalSerializedTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceNonTransactionalSerializedTest.java
new file mode 100644
index 0000000..99713a7
--- /dev/null
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceNonTransactionalSerializedTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.tests.indexer;
+
+import org.apache.druid.testing.guice.DruidTestModuleFactory;
+import org.apache.druid.tests.TestNGGroup;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+@Test(groups = TestNGGroup.KAFKA_INDEX_SLOW)
+@Guice(moduleFactory = DruidTestModuleFactory.class)
+public class ITKafkaIndexingServiceNonTransactionalSerializedTest extends AbstractKafkaIndexingServiceTest
+{
+  @Override
+  protected boolean isKafkaWriterTransactionalEnabled()
+  {
+    return false;
+  }
+
+  @Override
+  public String getTestNamePrefix()
+  {
+    return "kafka_nontransactional_serialized";
+  }
+
+  @BeforeClass
+  public void beforeClass() throws Exception
+  {
+    doBeforeClass();
+  }
+
+  @AfterClass
+  public void tearDown()
+  {
+    doClassTeardown();
+  }
+
+  /**
+   * This test must be run individually since the test affect and modify the state of the Druid cluster
+   */
+  @Test
+  public void testKafkaIndexDataWithLosingCoordinator() throws Exception
+  {
+    doTestIndexDataWithLosingCoordinator();
+  }
+
+  /**
+   * This test must be run individually since the test affect and modify the state of the Druid cluster
+   */
+  @Test
+  public void testKafkaIndexDataWithLosingOverlord() throws Exception
+  {
+    doTestIndexDataWithLosingOverlord();
+  }
+
+  /**
+   * This test must be run individually since the test affect and modify the state of the Druid cluster
+   */
+  @Test
+  public void testKafkaIndexDataWithLosingHistorical() throws Exception
+  {
+    doTestIndexDataWithLosingHistorical();
+  }
+}
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTest.java
deleted file mode 100644
index 30e4bab..0000000
--- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTest.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.tests.indexer;
-
-import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.testing.guice.DruidTestModuleFactory;
-import org.apache.druid.tests.TestNGGroup;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Guice;
-import org.testng.annotations.Test;
-
-@Test(groups = TestNGGroup.KAFKA_INDEX)
-@Guice(moduleFactory = DruidTestModuleFactory.class)
-public class ITKafkaIndexingServiceTest extends AbstractKafkaIndexerTest
-{
-  private static final Logger LOG = new Logger(ITKafkaIndexingServiceTest.class);
-  private static final String DATASOURCE = "kafka_indexing_service_test";
-
-  @DataProvider
-  public static Object[][] testParams()
-  {
-    return new Object[][]{
-        {"legacy_parser"},
-        {"input_format"}
-    };
-  }
-
-  @Test(dataProvider = "testParams")
-  public void testKafka(String param)
-  {
-    final String supervisorSpecPath = "legacy_parser".equals(param)
-                                      ? INDEXER_FILE_LEGACY_PARSER
-                                      : INDEXER_FILE_INPUT_FORMAT;
-    LOG.info("Starting test: ITKafkaIndexingServiceTest");
-    doKafkaIndexTest(StringUtils.format("%s_%s", DATASOURCE, param), supervisorSpecPath, false);
-  }
-
-  @AfterMethod
-  public void afterClass()
-  {
-    LOG.info("teardown");
-    doTearDown();
-  }
-}
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTransactionalSerializedTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTransactionalSerializedTest.java
new file mode 100644
index 0000000..06bcf05
--- /dev/null
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTransactionalSerializedTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.tests.indexer;
+
+import org.apache.druid.testing.guice.DruidTestModuleFactory;
+import org.apache.druid.tests.TestNGGroup;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+@Test(groups = TestNGGroup.TRANSACTIONAL_KAFKA_INDEX_SLOW)
+@Guice(moduleFactory = DruidTestModuleFactory.class)
+public class ITKafkaIndexingServiceTransactionalSerializedTest extends AbstractKafkaIndexingServiceTest
+{
+  @Override
+  protected boolean isKafkaWriterTransactionalEnabled()
+  {
+    return true;
+  }
+
+  @Override
+  public String getTestNamePrefix()
+  {
+    return "kafka_transactional_serialized";
+  }
+
+  @BeforeClass
+  public void beforeClass() throws Exception
+  {
+    doBeforeClass();
+  }
+
+  @AfterClass
+  public void tearDown()
+  {
+    doClassTeardown();
+  }
+
+  /**
+   * This test must be run individually since the test affect and modify the state of the Druid cluster
+   */
+  @Test
+  public void testKafkaIndexDataWithLosingCoordinator() throws Exception
+  {
+    doTestIndexDataWithLosingCoordinator();
+  }
+
+  /**
+   * This test must be run individually since the test affect and modify the state of the Druid cluster
+   */
+  @Test
+  public void testKafkaIndexDataWithLosingOverlord() throws Exception
+  {
+    doTestIndexDataWithLosingOverlord();
+  }
+
+  /**
+   * This test must be run individually since the test affect and modify the state of the Druid cluster
+   */
+  @Test
+  public void testKafkaIndexDataWithLosingHistorical() throws Exception
+  {
+    doTestIndexDataWithLosingHistorical();
+  }
+}
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTransactionalTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTransactionalTest.java
deleted file mode 100644
index 04c52b2..0000000
--- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTransactionalTest.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.tests.indexer;
-
-import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.testing.guice.DruidTestModuleFactory;
-import org.apache.druid.tests.TestNGGroup;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Guice;
-import org.testng.annotations.Test;
-
-/**
- * This is a test for the Kafka indexing service with transactional topics
- */
-@Test(groups = TestNGGroup.KAFKA_INDEX)
-@Guice(moduleFactory = DruidTestModuleFactory.class)
-public class ITKafkaIndexingServiceTransactionalTest extends AbstractKafkaIndexerTest
-{
-  private static final Logger LOG = new Logger(ITKafkaIndexingServiceTransactionalTest.class);
-  private static final String DATASOURCE = "kafka_indexing_service_txn_test";
-
-  @DataProvider
-  public static Object[][] testParams()
-  {
-    return new Object[][]{
-        {"legacy_parser"},
-        {"input_format"}
-    };
-  }
-
-  @Test(dataProvider = "testParams")
-  public void testKafka(String param)
-  {
-    final String supervisorSpecPath = "legacy_parser".equals(param)
-                                      ? INDEXER_FILE_LEGACY_PARSER
-                                      : INDEXER_FILE_INPUT_FORMAT;
-    LOG.info("Starting test: ITKafkaIndexingServiceTransactionalTest");
-    doKafkaIndexTest(StringUtils.format("%s_%s", DATASOURCE, param), supervisorSpecPath, false);
-  }
-
-  @AfterMethod
-  public void afterClass()
-  {
-    LOG.info("teardown");
-    doTearDown();
-  }
-}
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKinesisIndexingServiceSerializedTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKinesisIndexingServiceSerializedTest.java
new file mode 100644
index 0000000..8e64abb
--- /dev/null
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKinesisIndexingServiceSerializedTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.tests.indexer;
+
+import org.apache.druid.testing.guice.DruidTestModuleFactory;
+import org.apache.druid.tests.TestNGGroup;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+@Test(groups = TestNGGroup.KINESIS_INDEX)
+@Guice(moduleFactory = DruidTestModuleFactory.class)
+public class ITKinesisIndexingServiceSerializedTest extends AbstractKinesisIndexingServiceTest
+{
+  @Override
+  public String getTestNamePrefix()
+  {
+    return "kinesis_serialized";
+  }
+
+  @BeforeClass
+  public void beforeClass() throws Exception
+  {
+    doBeforeClass();
+  }
+
+  @AfterClass
+  public void tearDown()
+  {
+    doClassTeardown();
+  }
+
+  /**
+   * This test must be run individually since the test affect and modify the state of the Druid cluster
+   */
+  @Test
+  public void testKinesisIndexDataWithLosingCoordinator() throws Exception
+  {
+    doTestIndexDataWithLosingCoordinator();
+  }
+
+  /**
+   * This test must be run individually since the test affect and modify the state of the Druid cluster
+   */
+  @Test
+  public void testKinesisIndexDataWithLosingOverlord() throws Exception
+  {
+    doTestIndexDataWithLosingOverlord();
+  }
+
+  /**
+   * This test must be run individually since the test affect and modify the state of the Druid cluster
+   */
+  @Test
+  public void testKinesisIndexDataWithLosingHistorical() throws Exception
+  {
+    doTestIndexDataWithLosingHistorical();
+  }
+}
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceNonTransactionalParallelizedTest.java b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceNonTransactionalParallelizedTest.java
new file mode 100644
index 0000000..199530e
--- /dev/null
+++ b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceNonTransactionalParallelizedTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.tests.parallelized;
+
+import org.apache.druid.testing.guice.DruidTestModuleFactory;
+import org.apache.druid.tests.TestNGGroup;
+import org.apache.druid.tests.indexer.AbstractKafkaIndexingServiceTest;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+@Test(groups = TestNGGroup.KAFKA_INDEX)
+@Guice(moduleFactory = DruidTestModuleFactory.class)
+public class ITKafkaIndexingServiceNonTransactionalParallelizedTest extends AbstractKafkaIndexingServiceTest
+{
+  @Override
+  protected boolean isKafkaWriterTransactionalEnabled()
+  {
+    return false;
+  }
+
+  @Override
+  public String getTestNamePrefix()
+  {
+    return "kafka_non_transactional_parallelized";
+  }
+
+  @BeforeClass
+  public void beforeClass() throws Exception
+  {
+    doBeforeClass();
+  }
+
+  @AfterClass
+  public void tearDown()
+  {
+    doClassTeardown();
+  }
+
+  /**
+   * This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource
+   * and supervisor maintained and scoped within this test only
+   */
+  @Test
+  public void testKafkaIndexDataWithLegacyParserStableState() throws Exception
+  {
+    doTestIndexDataWithLegacyParserStableState();
+  }
+
+  /**
+   * This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource
+   * and supervisor maintained and scoped within this test only
+   */
+  @Test
+  public void testKafkaIndexDataWithInputFormatStableState() throws Exception
+  {
+    doTestIndexDataWithInputFormatStableState();
+  }
+
+  /**
+   * This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource
+   * and supervisor maintained and scoped within this test only
+   */
+  @Test
+  public void testKafkaIndexDataWithStartStopSupervisor() throws Exception
+  {
+    doTestIndexDataWithStartStopSupervisor();
+  }
+
+  /**
+   * This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource
+   * and supervisor maintained and scoped within this test only
+   */
+  @Test
+  public void testKafkaIndexDataWithKafkaReshardSplit() throws Exception
+  {
+    doTestIndexDataWithStreamReshardSplit();
+  }
+}
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceTransactionalParallelizedTest.java b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceTransactionalParallelizedTest.java
new file mode 100644
index 0000000..7db3a7f
--- /dev/null
+++ b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceTransactionalParallelizedTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.tests.parallelized;
+
+import org.apache.druid.testing.guice.DruidTestModuleFactory;
+import org.apache.druid.tests.TestNGGroup;
+import org.apache.druid.tests.indexer.AbstractKafkaIndexingServiceTest;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+@Test(groups = TestNGGroup.TRANSACTIONAL_KAFKA_INDEX)
+@Guice(moduleFactory = DruidTestModuleFactory.class)
+public class ITKafkaIndexingServiceTransactionalParallelizedTest extends AbstractKafkaIndexingServiceTest
+{
+  @Override
+  protected boolean isKafkaWriterTransactionalEnabled()
+  {
+    return true;
+  }
+
+  @Override
+  public String getTestNamePrefix()
+  {
+    return "kafka_transactional_parallelized";
+  }
+
+  @BeforeClass
+  public void beforeClass() throws Exception
+  {
+    doBeforeClass();
+  }
+
+  @AfterClass
+  public void tearDown()
+  {
+    doClassTeardown();
+  }
+
+  /**
+   * This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource
+   * and supervisor maintained and scoped within this test only
+   */
+  @Test
+  public void testKafkaIndexDataWithLegacyParserStableState() throws Exception
+  {
+    doTestIndexDataWithLegacyParserStableState();
+  }
+
+  /**
+   * This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource
+   * and supervisor maintained and scoped within this test only
+   */
+  @Test
+  public void testKafkaIndexDataWithInputFormatStableState() throws Exception
+  {
+    doTestIndexDataWithInputFormatStableState();
+  }
+
+  /**
+   * This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource
+   * and supervisor maintained and scoped within this test only
+   */
+  @Test
+  public void testKafkaIndexDataWithStartStopSupervisor() throws Exception
+  {
+    doTestIndexDataWithStartStopSupervisor();
+  }
+
+  /**
+   * This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource
+   * and supervisor maintained and scoped within this test only
+   */
+  @Test
+  public void testKafkaIndexDataWithKafkaReshardSplit() throws Exception
+  {
+    doTestIndexDataWithStreamReshardSplit();
+  }
+}
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKinesisIndexingServiceParallelizedTest.java b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKinesisIndexingServiceParallelizedTest.java
new file mode 100644
index 0000000..38816dc
--- /dev/null
+++ b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKinesisIndexingServiceParallelizedTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.tests.parallelized;
+
+import org.apache.druid.testing.guice.DruidTestModuleFactory;
+import org.apache.druid.tests.TestNGGroup;
+import org.apache.druid.tests.indexer.AbstractKinesisIndexingServiceTest;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+@Test(groups = TestNGGroup.KINESIS_INDEX)
+@Guice(moduleFactory = DruidTestModuleFactory.class)
+public class ITKinesisIndexingServiceParallelizedTest extends AbstractKinesisIndexingServiceTest
+{
+  @Override
+  public String getTestNamePrefix()
+  {
+    return "kinesis_parallelized";
+  }
+
+  @BeforeClass
+  public void beforeClass() throws Exception
+  {
+    doBeforeClass();
+  }
+
+  @AfterClass
+  public void tearDown()
+  {
+    doClassTeardown();
+  }
+
+  /**
+   * This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource
+   * and supervisor maintained and scoped within this test only
+   */
+  @Test
+  public void testKinesisIndexDataWithLegacyParserStableState() throws Exception
+  {
+    doTestIndexDataWithLegacyParserStableState();
+  }
+
+  /**
+   * This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource
+   * and supervisor maintained and scoped within this test only
+   */
+  @Test
+  public void testKinesisIndexDataWithInputFormatStableState() throws Exception
+  {
+    doTestIndexDataWithInputFormatStableState();
+  }
+
+  /**
+   * This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource
+   * and supervisor maintained and scoped within this test only
+   */
+  @Test
+  public void testKinesisIndexDataWithStartStopSupervisor() throws Exception
+  {
+    doTestIndexDataWithStartStopSupervisor();
+  }
+
+  /**
+   * This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource
+   * and supervisor maintained and scoped within this test only
+   */
+  @Test
+  public void testKinesisIndexDataWithKinesisReshardSplit() throws Exception
+  {
+    doTestIndexDataWithStreamReshardSplit();
+  }
+
+  /**
+   * This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource
+   * and supervisor maintained and scoped within this test only
+   */
+  @Test
+  public void testKinesisIndexDataWithKinesisReshardMerge() throws Exception
+  {
+    doTestIndexDataWithStreamReshardMerge();
+  }
+}
diff --git a/integration-tests/src/test/resources/indexer/kafka_index_queries.json b/integration-tests/src/test/resources/indexer/stream_index_queries.json
similarity index 100%
rename from integration-tests/src/test/resources/indexer/kafka_index_queries.json
rename to integration-tests/src/test/resources/indexer/stream_index_queries.json
diff --git a/integration-tests/src/test/resources/indexer/kafka_supervisor_spec_input_format.json b/integration-tests/src/test/resources/indexer/stream_supervisor_spec_input_format.json
similarity index 83%
rename from integration-tests/src/test/resources/indexer/kafka_supervisor_spec_input_format.json
rename to integration-tests/src/test/resources/indexer/stream_supervisor_spec_input_format.json
index 4ba59af..ce9bedc 100644
--- a/integration-tests/src/test/resources/indexer/kafka_supervisor_spec_input_format.json
+++ b/integration-tests/src/test/resources/indexer/stream_supervisor_spec_input_format.json
@@ -1,5 +1,5 @@
 {
-  "type": "kafka",
+  "type": "%%STREAM_TYPE%%",
   "dataSchema": {
     "dataSource": "%%DATASOURCE%%",
     "timestampSpec": {
@@ -39,18 +39,18 @@
     }
   },
   "tuningConfig": {
-    "type": "kafka",
+    "type": "%%STREAM_TYPE%%",
     "intermediatePersistPeriod": "PT30S",
     "maxRowsPerSegment": 5000000,
     "maxRowsInMemory": 500000
   },
   "ioConfig": {
-    "topic": "%%TOPIC%%",
-    "consumerProperties": %%CONSUMER_PROPERTIES%%,
+    "%%TOPIC_KEY%%": "%%TOPIC_VALUE%%",
+    "%%STREAM_PROPERTIES_KEY%%": %%STREAM_PROPERTIES_VALUE%%,
     "taskCount": 2,
     "replicas": 1,
-    "taskDuration": "PT2M",
-    "useEarliestOffset": true,
+    "taskDuration": "PT5M",
+    "%%USE_EARLIEST_KEY%%": true,
     "inputFormat" : {
       "type" : "json"
     }
diff --git a/integration-tests/src/test/resources/indexer/kafka_supervisor_spec_legacy_parser.json b/integration-tests/src/test/resources/indexer/stream_supervisor_spec_legacy_parser.json
similarity index 84%
rename from integration-tests/src/test/resources/indexer/kafka_supervisor_spec_legacy_parser.json
rename to integration-tests/src/test/resources/indexer/stream_supervisor_spec_legacy_parser.json
index 511b65d..623aadf 100644
--- a/integration-tests/src/test/resources/indexer/kafka_supervisor_spec_legacy_parser.json
+++ b/integration-tests/src/test/resources/indexer/stream_supervisor_spec_legacy_parser.json
@@ -1,5 +1,5 @@
 {
-  "type": "kafka",
+  "type": "%%STREAM_TYPE%%",
   "dataSchema": {
     "dataSource": "%%DATASOURCE%%",
     "parser": {
@@ -45,17 +45,17 @@
     }
   },
   "tuningConfig": {
-    "type": "kafka",
+    "type": "%%STREAM_TYPE%%",
     "intermediatePersistPeriod": "PT30S",
     "maxRowsPerSegment": 5000000,
     "maxRowsInMemory": 500000
   },
   "ioConfig": {
-    "topic": "%%TOPIC%%",
-    "consumerProperties": %%CONSUMER_PROPERTIES%%,
+    "%%TOPIC_KEY%%": "%%TOPIC_VALUE%%",
+    "%%STREAM_PROPERTIES_KEY%%": %%STREAM_PROPERTIES_VALUE%%,
     "taskCount": 2,
     "replicas": 1,
-    "taskDuration": "PT2M",
-    "useEarliestOffset": true
+    "taskDuration": "PT5M",
+    "%%USE_EARLIEST_KEY%%": true
   }
 }
diff --git a/integration-tests/src/test/resources/testng.xml b/integration-tests/src/test/resources/testng.xml
index 333029c..5a0735a 100644
--- a/integration-tests/src/test/resources/testng.xml
+++ b/integration-tests/src/test/resources/testng.xml
@@ -21,14 +21,21 @@
 
 
 <suite name="IntegrationTestSuite">
-<listeners>
-  <listener class-name="org.apache.druid.testing.utils.LoggerListener" />
-</listeners>
-<test name="AllTests">
-<packages>
-  <package name="org.apache.druid.tests.*">
-    <exclude name="org.apache.druid.tests.hadoop"/>
-  </package>
-</packages>
-</test>
+  <listeners>
+    <listener class-name="org.apache.druid.testing.utils.LoggerListener" />
+    <listener class-name="org.apache.druid.testing.utils.SuiteListener" />
+  </listeners>
+  <test name="AllSerializedTests">
+    <packages>
+      <package name="org.apache.druid.tests.*">
+        <exclude name="org.apache.druid.tests.hadoop"/>
+        <exclude name="org.apache.druid.tests.parallelized"/>
+      </package>
+    </packages>
+  </test>
+  <test name="AllParallelizedTests" parallel="methods" thread-count="2">
+    <packages>
+      <package name="org.apache.druid.tests.parallelized.*"/>
+    </packages>
+  </test>
 </suite>
diff --git a/integration-tests/stop_cluster.sh b/integration-tests/stop_cluster.sh
index 4ce4268..2828a0f 100755
--- a/integration-tests/stop_cluster.sh
+++ b/integration-tests/stop_cluster.sh
@@ -14,6 +14,12 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+# Skip stopping docker if flag set (For use during development)
+if [ -n "$DRUID_INTEGRATION_TEST_SKIP_STOP_DOCKER" ] && [ "$DRUID_INTEGRATION_TEST_SKIP_STOP_DOCKER" == true ]
+  then
+    exit 0
+  fi
+
 for node in druid-historical druid-coordinator druid-overlord druid-router druid-router-permissive-tls druid-router-no-client-auth-tls druid-router-custom-check-tls druid-broker druid-middlemanager druid-zookeeper-kafka druid-metadata-storage druid-it-hadoop;
 
 do
diff --git a/pom.xml b/pom.xml
index 2dd1bb5..a4ad231 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1192,7 +1192,7 @@
             <dependency>
                 <groupId>org.testng</groupId>
                 <artifactId>testng</artifactId>
-                <version>6.8.7</version>
+                <version>6.14.3</version>
             </dependency>
             <dependency>
                 <groupId>org.hamcrest</groupId>
@@ -1249,7 +1249,6 @@
                         <!-- Ignore non-production code -->
                         <exclude>org/apache/druid/benchmark/**/*</exclude>  <!-- benchmarks -->
                         <exclude>org/apache/druid/**/*Benchmark*</exclude>  <!-- benchmarks -->
-                        <exclude>org/testng/DruidTestRunnerFactory*</exclude>  <!-- benchmarks -->
                         <exclude>org/apache/druid/testing/**/*</exclude>  <!-- integration-tests -->
                     </excludes>
                 </configuration>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org