You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ji...@apache.org on 2020/04/29 20:18:19 UTC

[druid] branch master updated: Integration tests for stream ingestion with various data formats (#9783)

This is an automated email from the ASF dual-hosted git repository.

jihoonson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 39722bd  Integration tests for stream ingestion with various data formats (#9783)
39722bd is described below

commit 39722bd0646464ca67d1a0bf6966c0b7e4aedaf9
Author: Jihoon Son <ji...@apache.org>
AuthorDate: Wed Apr 29 13:18:01 2020 -0700

    Integration tests for stream ingestion with various data formats (#9783)
    
    * Integration tests for stream ingestion with various data formats
    
    * fix npe
    
    * better logging; fix tsv
    
    * fix tsv
    
    * exclude kinesis from travis
    
    * some readme
---
 .travis.yml                                        |  12 +-
 integration-tests/README.md                        |   5 +-
 integration-tests/pom.xml                          |  14 +
 .../clients/OverlordResourceTestClient.java        |   2 +-
 .../druid/testing/utils/AvroEventSerializer.java   | 131 ++++++++
 .../druid/testing/utils/CsvEventSerializer.java    |  57 ++++
 ...enerator.java => DelimitedEventSerializer.java} |  34 +-
 .../druid/testing/utils/EventSerializer.java       |  50 +++
 .../druid/testing/utils/JsonEventSerializer.java   |  57 ++++
 .../druid/testing/utils/KafkaEventWriter.java      |  30 +-
 .../druid/testing/utils/KinesisEventWriter.java    |  13 +-
 .../apache/druid/testing/utils/LoggerListener.java |  24 +-
 .../druid/testing/utils/StreamEventWriter.java     |  25 +-
 .../druid/testing/utils/StreamGenerator.java       |   2 -
 .../utils/StreamVerifierSyntheticEvent.java        | 104 ------
 .../testing/utils/SyntheticStreamGenerator.java    |  46 +--
 .../utils/WikipediaStreamEventStreamGenerator.java |  48 +--
 .../java/org/apache/druid/tests/TestNGGroup.java   |  10 +
 .../druid/tests/indexer/AbstractIndexerTest.java   |  34 +-
 .../indexer/AbstractKafkaIndexingServiceTest.java  |  39 ++-
 .../AbstractKinesisIndexingServiceTest.java        |  36 ++-
 .../tests/indexer/AbstractStreamIndexingTest.java  | 350 +++++++++++++++------
 ...exingServiceNonTransactionalSerializedTest.java |  19 +-
 ...IndexingServiceTransactionalSerializedTest.java |  19 +-
 .../ITKinesisIndexingServiceSerializedTest.java    |  13 +-
 .../ITKafkaIndexingServiceDataFormatTest.java      |  99 ++++++
 ...ingServiceNonTransactionalParallelizedTest.java |  37 +--
 ...dexingServiceTransactionalParallelizedTest.java |  37 +--
 .../ITKinesisIndexingServiceDataFormatTest.java    |  96 ++++++
 .../ITKinesisIndexingServiceParallelizedTest.java  |  33 +-
 .../stream_supervisor_spec_legacy_parser.json      |  61 ----
 .../stream/data/avro/parser/input_row_parser.json  |  39 +++
 .../stream/data/avro/serializer/serializer.json    |   3 +
 .../stream/data/csv/input_format/input_format.json |   4 +
 .../stream/data/csv/parser/input_row_parser.json   |  16 +
 .../stream/data/csv/serializer/serializer.json     |   3 +
 .../data/json/input_format/input_format.json       |   3 +
 .../stream/data/json/parser/input_row_parser.json  |  15 +
 .../stream/data/json/serializer/serializer.json    |   3 +
 .../data/supervisor_spec_template.json}            |   5 +-
 .../stream/data/tsv/input_format/input_format.json |   4 +
 .../stream/data/tsv/parser/input_row_parser.json   |  16 +
 .../stream/data/tsv/serializer/serializer.json     |   3 +
 .../queries}/stream_index_queries.json             |   0
 integration-tests/src/test/resources/testng.xml    |   2 +-
 45 files changed, 1113 insertions(+), 540 deletions(-)

diff --git a/.travis.yml b/.travis.yml
index 93ddf47..770c8fc 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -337,6 +337,14 @@ jobs:
       script: *run_integration_test
       after_failure: *integration_test_diags
 
+    - &integration_kafka_format_tests
+        name: "(Compile=openjdk8, Run=openjdk8) Kafka index integration test with various formats"
+        jdk: openjdk8
+        services: *integration_test_services
+        env: TESTNG_GROUPS='-Dgroups=kafka-data-format' JVM_RUNTIME='-Djvm.runtime=8'
+        script: *run_integration_test
+        after_failure: *integration_test_diags
+
     - &integration_query
       name: "(Compile=openjdk8, Run=openjdk8) query integration test"
       jdk: openjdk8
@@ -365,7 +373,7 @@ jobs:
       name: "(Compile=openjdk8, Run=openjdk8) other integration test"
       jdk: openjdk8
       services: *integration_test_services
-      env: TESTNG_GROUPS='-DexcludedGroups=batch-index,perfect-rollup-parallel-batch-index,kafka-index,query,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow' JVM_RUNTIME='-Djvm.runtime=8'
+      env: TESTNG_GROUPS='-DexcludedGroups=batch-index,perfect-rollup-parallel-batch-index,kafka-index,query,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format' JVM_RUNTIME='-Djvm.runtime=8'
       script: *run_integration_test
       after_failure: *integration_test_diags
     # END - Integration tests for Compile with Java 8 and Run with Java 8
@@ -399,7 +407,7 @@ jobs:
     - <<: *integration_tests
       name: "(Compile=openjdk8, Run=openjdk11) other integration test"
       jdk: openjdk8
-      env: TESTNG_GROUPS='-DexcludedGroups=batch-index,perfect-rollup-parallel-batch-index,kafka-index,query,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow' JVM_RUNTIME='-Djvm.runtime=11'
+      env: TESTNG_GROUPS='-DexcludedGroups=batch-index,perfect-rollup-parallel-batch-index,kafka-index,query,realtime-index,security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format' JVM_RUNTIME='-Djvm.runtime=11'
     # END - Integration tests for Compile with Java 8 and Run with Java 11
 
     - name: "security vulnerabilities"
diff --git a/integration-tests/README.md b/integration-tests/README.md
index f390e85..897c889 100644
--- a/integration-tests/README.md
+++ b/integration-tests/README.md
@@ -319,7 +319,10 @@ Refer ITIndexerTest as an example on how to use dependency Injection
 By default, test methods in a test class will be run in sequential order one at a time. Test methods for a given test 
 class can be set to run in parallel (multiple test methods of each class running at the same time) by excluding
 the given class/package from the "AllSerializedTests" test tag section and including it in the "AllParallelizedTests" 
-test tag section in integration-tests/src/test/resources/testng.xml  
+test tag section in integration-tests/src/test/resources/testng.xml. TestNG uses two parameters, i.e.,
+`thread-count` and `data-provider-thread-count`, for parallel test execution, which are set to 2 for Druid integration tests.
+You may want to modify those values for faster execution.
+See https://testng.org/doc/documentation-main.html#parallel-running and https://testng.org/doc/documentation-main.html#parameters-dataproviders for details.
 Please be mindful when adding tests to the "AllParallelizedTests" test tag that the tests can run in parallel with
 other tests from the same class at the same time. i.e. test does not modify/restart/stop the druid cluster or other dependency containers,
 test does not use excessive memory starving other concurent task, test does not modify and/or use other task, 
diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml
index 4c2ef70..e7db3a2 100644
--- a/integration-tests/pom.xml
+++ b/integration-tests/pom.xml
@@ -38,6 +38,14 @@
             <version>0.13.1</version>
         </dependency>
         <dependency>
+            <groupId>org.apache.avro</groupId>
+            <artifactId>avro</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.opencsv</groupId>
+            <artifactId>opencsv</artifactId>
+        </dependency>
+        <dependency>
             <groupId>com.amazonaws</groupId>
             <artifactId>aws-java-sdk-kinesis</artifactId>
             <version>${aws.sdk.version}</version>
@@ -83,6 +91,12 @@
         </dependency>
         <dependency>
             <groupId>org.apache.druid.extensions</groupId>
+            <artifactId>druid-avro-extensions</artifactId>
+            <version>${project.parent.version}</version>
+            <scope>runtime</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.druid.extensions</groupId>
             <artifactId>druid-s3-extensions</artifactId>
             <version>${project.parent.version}</version>
             <scope>runtime</scope>
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
index 7995595..2b06ba5 100644
--- a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
+++ b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
@@ -253,7 +253,7 @@ public class OverlordResourceTestClient
       ).get();
       if (!response.getStatus().equals(HttpResponseStatus.OK)) {
         throw new ISE(
-            "Error while submitting supervisor to overlord, response [%s %s]",
+            "Error while submitting supervisor to overlord, response [%s: %s]",
             response.getStatus(),
             response.getContent()
         );
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/AvroEventSerializer.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/AvroEventSerializer.java
new file mode 100644
index 0000000..284fd09
--- /dev/null
+++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/AvroEventSerializer.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.utils;
+
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.druid.java.util.common.Pair;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class AvroEventSerializer implements EventSerializer
+{
+  public static final String TYPE = "avro";
+
+  private static final Schema SCHEMA = SchemaBuilder
+      .record("wikipedia")
+      .namespace("org.apache.druid")
+      .fields()
+      .requiredString("timestamp")
+      .requiredString("page")
+      .requiredString("language")
+      .requiredString("user")
+      .requiredString("unpatrolled")
+      .requiredString("newPage")
+      .requiredString("robot")
+      .requiredString("anonymous")
+      .requiredString("namespace")
+      .requiredString("continent")
+      .requiredString("country")
+      .requiredString("region")
+      .requiredString("city")
+      .requiredInt("added")
+      .requiredInt("deleted")
+      .requiredInt("delta")
+      .endRecord();
+
+  private final DatumWriter<Object> writer = new GenericDatumWriter<>(SCHEMA);
+
+  @Override
+  public byte[] serialize(List<Pair<String, Object>> event) throws IOException
+  {
+    final WikipediaRecord record = new WikipediaRecord();
+    event.forEach(pair -> record.put(pair.lhs, pair.rhs));
+    final ByteArrayOutputStream out = new ByteArrayOutputStream();
+    final BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
+    writer.write(record, encoder);
+    encoder.flush();
+    out.close();
+    return out.toByteArray();
+  }
+
+  @Override
+  public void close()
+  {
+  }
+
+  private static class WikipediaRecord implements GenericRecord
+  {
+    private final Map<String, Object> event = new HashMap<>();
+    private final BiMap<Integer, String> indexes = HashBiMap.create(SCHEMA.getFields().size());
+
+    private int nextIndex = 0;
+
+    @Override
+    public void put(String key, Object v)
+    {
+      event.put(key, v);
+      indexes.inverse().computeIfAbsent(key, k -> nextIndex++);
+    }
+
+    @Override
+    public Object get(String key)
+    {
+      return event.get(key);
+    }
+
+    @Override
+    public void put(int i, Object v)
+    {
+      final String key = indexes.get(i);
+      if (key == null) {
+        throw new IndexOutOfBoundsException();
+      }
+      put(key, v);
+    }
+
+    @Override
+    public Object get(int i)
+    {
+      final String key = indexes.get(i);
+      if (key == null) {
+        throw new IndexOutOfBoundsException();
+      }
+      return get(key);
+    }
+
+    @Override
+    public Schema getSchema()
+    {
+      return SCHEMA;
+    }
+  }
+}
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/CsvEventSerializer.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/CsvEventSerializer.java
new file mode 100644
index 0000000..77d5455
--- /dev/null
+++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/CsvEventSerializer.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.utils;
+
+import com.opencsv.CSVWriter;
+import org.apache.druid.java.util.common.Pair;
+
+import java.io.BufferedWriter;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+public class CsvEventSerializer implements EventSerializer
+{
+  public static final String TYPE = "csv";
+
+  private final ByteArrayOutputStream bos = new ByteArrayOutputStream();
+  private final CSVWriter writer = new CSVWriter(
+      new BufferedWriter(new OutputStreamWriter(bos, StandardCharsets.UTF_8))
+  );
+
+  @Override
+  public byte[] serialize(List<Pair<String, Object>> event) throws IOException
+  {
+    //noinspection ConstantConditions
+    writer.writeNext(event.stream().map(pair -> pair.rhs.toString()).toArray(String[]::new));
+    writer.flush();
+    final byte[] serialized = bos.toByteArray();
+    bos.reset();
+    return serialized;
+  }
+
+  @Override
+  public void close() throws IOException
+  {
+    writer.close();
+  }
+}
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/StreamVerifierEventGenerator.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/DelimitedEventSerializer.java
similarity index 51%
rename from integration-tests/src/main/java/org/apache/druid/testing/utils/StreamVerifierEventGenerator.java
rename to integration-tests/src/main/java/org/apache/druid/testing/utils/DelimitedEventSerializer.java
index bb56c79..d30e21d 100644
--- a/integration-tests/src/main/java/org/apache/druid/testing/utils/StreamVerifierEventGenerator.java
+++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/DelimitedEventSerializer.java
@@ -19,37 +19,25 @@
 
 package org.apache.druid.testing.utils;
 
-import org.apache.druid.java.util.common.DateTimes;
-import org.joda.time.DateTime;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.StringUtils;
 
-import java.util.UUID;
+import java.util.List;
+import java.util.stream.Collectors;
 
-public class StreamVerifierEventGenerator extends SyntheticStreamGenerator
+public class DelimitedEventSerializer implements EventSerializer
 {
-  public StreamVerifierEventGenerator(int eventsPerSeconds, long cyclePaddingMs)
-  {
-    super(eventsPerSeconds, cyclePaddingMs);
-  }
+  public static final String TYPE = "tsv";
 
   @Override
-  Object getEvent(int i, DateTime timestamp)
+  public byte[] serialize(List<Pair<String, Object>> event)
   {
-    return StreamVerifierSyntheticEvent.of(
-        UUID.randomUUID().toString(),
-        timestamp.getMillis(),
-        DateTimes.nowUtc().getMillis(),
-        i,
-        i == getEventsPerSecond() ? getSumOfEventSequence(getEventsPerSecond()) : null,
-        i == 1
-    );
+    //noinspection ConstantConditions
+    return StringUtils.toUtf8(event.stream().map(pair -> pair.rhs.toString()).collect(Collectors.joining("\t")));
   }
 
-
-  /**
-   * Assumes the first number in the sequence is 1, incrementing by 1, until numEvents.
-   */
-  private long getSumOfEventSequence(int numEvents)
+  @Override
+  public void close()
   {
-    return (numEvents * (1 + numEvents)) / 2;
   }
 }
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/EventSerializer.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/EventSerializer.java
new file mode 100644
index 0000000..014d8c8
--- /dev/null
+++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/EventSerializer.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.utils;
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.annotation.JsonTypeInfo.Id;
+import org.apache.druid.java.util.common.Pair;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * EventSerializer is for serializing an event into a byte array.
+ * This interface is used to write generated events on stream processing systems such as Kafka or Kinesis
+ * in integration tests.
+ *
+ * @see SyntheticStreamGenerator
+ * @see StreamEventWriter
+ */
+@JsonTypeInfo(use = Id.NAME, property = "type")
+@JsonSubTypes(value = {
+    @Type(name = JsonEventSerializer.TYPE, value = JsonEventSerializer.class),
+    @Type(name = CsvEventSerializer.TYPE, value = CsvEventSerializer.class),
+    @Type(name = DelimitedEventSerializer.TYPE, value = DelimitedEventSerializer.class),
+    @Type(name = AvroEventSerializer.TYPE, value = AvroEventSerializer.class)
+})
+public interface EventSerializer extends Closeable
+{
+  byte[] serialize(List<Pair<String, Object>> event) throws IOException;
+}
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/JsonEventSerializer.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/JsonEventSerializer.java
new file mode 100644
index 0000000..2d27660
--- /dev/null
+++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/JsonEventSerializer.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.utils;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Maps;
+import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.java.util.common.Pair;
+
+import java.util.List;
+import java.util.Map;
+
+public class JsonEventSerializer implements EventSerializer
+{
+  public static final String TYPE = "json";
+
+  private final ObjectMapper jsonMapper;
+
+  @JsonCreator
+  public JsonEventSerializer(@JacksonInject @Json ObjectMapper jsonMapper)
+  {
+    this.jsonMapper = jsonMapper;
+  }
+
+  @Override
+  public byte[] serialize(List<Pair<String, Object>> event) throws JsonProcessingException
+  {
+    Map<String, Object> map = Maps.newHashMapWithExpectedSize(event.size());
+    event.forEach(pair -> map.put(pair.lhs, pair.rhs));
+    return jsonMapper.writeValueAsBytes(map);
+  }
+
+  @Override
+  public void close()
+  {
+  }
+}
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaEventWriter.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaEventWriter.java
index f7ec755..14b5714 100644
--- a/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaEventWriter.java
+++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaEventWriter.java
@@ -31,12 +31,13 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
 public class KafkaEventWriter implements StreamEventWriter
 {
   private static final String TEST_PROPERTY_PREFIX = "kafka.test.property.";
-  private final KafkaProducer<String, String> producer;
+  private final KafkaProducer<String, byte[]> producer;
   private final boolean txnEnabled;
   private final List<Future<RecordMetadata>> pendingWriteRecords = new ArrayList<>();
 
@@ -57,7 +58,7 @@ public class KafkaEventWriter implements StreamEventWriter
     this.producer = new KafkaProducer<>(
         properties,
         new StringSerializer(),
-        new StringSerializer()
+        new ByteArraySerializer()
     );
     if (txnEnabled) {
       producer.initTransactions();
@@ -91,25 +92,42 @@ public class KafkaEventWriter implements StreamEventWriter
   }
 
   @Override
-  public void write(String topic, String event)
+  public void write(String topic, byte[] event)
   {
     Future<RecordMetadata> future = producer.send(new ProducerRecord<>(topic, event));
     pendingWriteRecords.add(future);
   }
 
   @Override
-  public void shutdown()
+  public void close()
   {
+    flush();
     producer.close();
   }
 
   @Override
-  public void flush() throws Exception
+  public void flush()
   {
+    Exception e = null;
     for (Future<RecordMetadata> future : pendingWriteRecords) {
-      future.get();
+      try {
+        future.get();
+      }
+      catch (InterruptedException | ExecutionException ex) {
+        if (ex instanceof InterruptedException) {
+          Thread.currentThread().interrupt();
+        }
+        if (e == null) {
+          e = ex;
+        } else {
+          e.addSuppressed(ex);
+        }
+      }
     }
     pendingWriteRecords.clear();
+    if (e != null) {
+      throw new RuntimeException(e);
+    }
   }
 
   private void addFilteredProperties(IntegrationTestingConfig config, Properties properties)
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/KinesisEventWriter.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/KinesisEventWriter.java
index 0377e9e..39b700c 100644
--- a/integration-tests/src/main/java/org/apache/druid/testing/utils/KinesisEventWriter.java
+++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/KinesisEventWriter.java
@@ -25,17 +25,13 @@ import com.amazonaws.services.kinesis.producer.KinesisProducer;
 import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
 import com.amazonaws.util.AwsHostNameUtils;
 import org.apache.commons.codec.digest.DigestUtils;
-import org.apache.druid.java.util.common.logger.Logger;
 
 import java.io.FileInputStream;
 import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
 import java.util.Properties;
 
 public class KinesisEventWriter implements StreamEventWriter
 {
-  private static final Logger LOG = new Logger(KinesisEventWriter.class);
-
   private final KinesisProducer kinesisProducer;
 
   public KinesisEventWriter(String endpoint, boolean aggregate) throws Exception
@@ -82,20 +78,19 @@ public class KinesisEventWriter implements StreamEventWriter
   }
 
   @Override
-  public void write(String streamName, String event)
+  public void write(String streamName, byte[] event)
   {
     kinesisProducer.addUserRecord(
         streamName,
         DigestUtils.sha1Hex(event),
-        ByteBuffer.wrap(event.getBytes(StandardCharsets.UTF_8))
+        ByteBuffer.wrap(event)
     );
   }
 
   @Override
-  public void shutdown()
+  public void close()
   {
-    LOG.info("Shutting down Kinesis client");
-    kinesisProducer.flushSync();
+    flush();
   }
 
   @Override
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/LoggerListener.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/LoggerListener.java
index 7282396..e74dc2f 100644
--- a/integration-tests/src/main/java/org/apache/druid/testing/utils/LoggerListener.java
+++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/LoggerListener.java
@@ -19,10 +19,13 @@
 
 package org.apache.druid.testing.utils;
 
+import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.testng.ITestResult;
 import org.testng.TestListenerAdapter;
 
+import java.util.Arrays;
+
 public class LoggerListener extends TestListenerAdapter
 {
   private static final Logger LOG = new Logger(LoggerListener.class);
@@ -30,25 +33,38 @@ public class LoggerListener extends TestListenerAdapter
   @Override
   public void onTestFailure(ITestResult tr)
   {
-    LOG.info("[%s] -- Test method failed", tr.getName());
+    LOG.error(tr.getThrowable(), "Failed %s", formatTestName(tr));
   }
 
   @Override
   public void onTestSkipped(ITestResult tr)
   {
-    LOG.info("[%s] -- Test method skipped", tr.getName());
+    LOG.warn("Skipped %s", formatTestName(tr));
   }
 
   @Override
   public void onTestSuccess(ITestResult tr)
   {
-    LOG.info("[%s] -- Test method passed", tr.getName());
+    LOG.info("Passed %s", formatTestName(tr));
   }
 
   @Override
   public void onTestStart(ITestResult tr)
   {
-    LOG.info("[%s] -- TEST START", tr.getName());
+    LOG.info("Starting %s", formatTestName(tr));
   }
 
+  private static String formatTestName(ITestResult tr)
+  {
+    if (tr.getParameters().length == 0) {
+      return StringUtils.format("[%s.%s]", tr.getTestClass().getName(), tr.getName());
+    } else {
+      return StringUtils.format(
+          "[%s.%s] with parameters %s",
+          tr.getTestClass().getName(),
+          tr.getName(),
+          Arrays.toString(tr.getParameters())
+      );
+    }
+  }
 }
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/StreamEventWriter.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/StreamEventWriter.java
index 5d25916..747cbd8 100644
--- a/integration-tests/src/main/java/org/apache/druid/testing/utils/StreamEventWriter.java
+++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/StreamEventWriter.java
@@ -20,21 +20,32 @@
 package org.apache.druid.testing.utils;
 
 
+import java.io.Closeable;
+
 /**
  * This interface is use to write test event data to the underlying stream (such as Kafka, Kinesis)
  * This can also be use with {@link StreamGenerator} to write particular set of test data
  */
-public interface StreamEventWriter
+public interface StreamEventWriter extends Closeable
 {
-  void write(String topic, String event);
-
-  void shutdown();
-
-  void flush() throws Exception;
-
   boolean isTransactionEnabled();
 
   void initTransaction();
 
   void commitTransaction();
+
+  void write(String topic, byte[] event);
+
+  /**
+   * Flush pending writes on the underlying stream. This method is synchronous and waits until the flush completes.
+   * Note that this method is not interruptible
+   */
+  void flush();
+
+  /**
+   * Close this writer. Any resource should be cleaned up when this method is called.
+   * Implementations must call {@link #flush()} before closing the writer.
+   */
+  @Override
+  void close();
 }
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/StreamGenerator.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/StreamGenerator.java
index f2d1f48..015cc91 100644
--- a/integration-tests/src/main/java/org/apache/druid/testing/utils/StreamGenerator.java
+++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/StreamGenerator.java
@@ -26,6 +26,4 @@ public interface StreamGenerator
   void run(String streamTopic, StreamEventWriter streamEventWriter, int totalNumberOfSeconds);
 
   void run(String streamTopic, StreamEventWriter streamEventWriter, int totalNumberOfSeconds, DateTime overrrideFirstEventTime);
-
-  void shutdown();
 }
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/StreamVerifierSyntheticEvent.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/StreamVerifierSyntheticEvent.java
deleted file mode 100644
index e8c314a..0000000
--- a/integration-tests/src/main/java/org/apache/druid/testing/utils/StreamVerifierSyntheticEvent.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.testing.utils;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-public class StreamVerifierSyntheticEvent
-{
-  private String id;
-  private long groupingTimestamp;
-  private long insertionTimestamp;
-  private long sequenceNumber;
-  private Long expectedSequenceNumberSum;
-  private boolean firstEvent;
-
-  public StreamVerifierSyntheticEvent(
-      String id,
-      long groupingTimestamp,
-      long insertionTimestamp,
-      long sequenceNumber,
-      Long expectedSequenceNumberSum,
-      boolean firstEvent
-  )
-  {
-    this.id = id;
-    this.groupingTimestamp = groupingTimestamp;
-    this.insertionTimestamp = insertionTimestamp;
-    this.sequenceNumber = sequenceNumber;
-    this.expectedSequenceNumberSum = expectedSequenceNumberSum;
-    this.firstEvent = firstEvent;
-  }
-
-  @JsonProperty
-  public String getId()
-  {
-    return id;
-  }
-
-  @JsonProperty
-  public long getGroupingTimestamp()
-  {
-    return groupingTimestamp;
-  }
-
-  @JsonProperty
-  public long getInsertionTimestamp()
-  {
-    return insertionTimestamp;
-  }
-
-  @JsonProperty
-  public long getSequenceNumber()
-  {
-    return sequenceNumber;
-  }
-
-  @JsonProperty
-  public Long getExpectedSequenceNumberSum()
-  {
-    return expectedSequenceNumberSum;
-  }
-
-  @JsonProperty
-  public Integer getFirstEventFlag()
-  {
-    return firstEvent ? 1 : null;
-  }
-
-  public static StreamVerifierSyntheticEvent of(
-      String id,
-      long groupingTimestamp,
-      long insertionTimestamp,
-      long sequenceNumber,
-      Long expectedSequenceNumberSum,
-      boolean firstEvent
-  )
-  {
-    return new StreamVerifierSyntheticEvent(
-        id,
-        groupingTimestamp,
-        insertionTimestamp,
-        sequenceNumber,
-        expectedSequenceNumberSum,
-        firstEvent
-    );
-  }
-}
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/SyntheticStreamGenerator.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/SyntheticStreamGenerator.java
index f2bfde8..c68db10 100644
--- a/integration-tests/src/main/java/org/apache/druid/testing/utils/SyntheticStreamGenerator.java
+++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/SyntheticStreamGenerator.java
@@ -19,32 +19,18 @@
 
 package org.apache.druid.testing.utils;
 
-import com.fasterxml.jackson.annotation.JsonInclude;
-import com.fasterxml.jackson.databind.InjectableValues;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.joda.time.DateTime;
 
+import java.util.List;
+
 public abstract class SyntheticStreamGenerator implements StreamGenerator
 {
-  private static final Logger log = new Logger(SyntheticStreamGenerator.class);
-  static final ObjectMapper MAPPER = new DefaultObjectMapper();
-
-  static {
-    MAPPER.setInjectableValues(
-        new InjectableValues.Std()
-            .addValue(ObjectMapper.class.getName(), MAPPER)
-    );
-    MAPPER.setSerializationInclusion(JsonInclude.Include.NON_NULL);
-  }
-
-  public int getEventsPerSecond()
-  {
-    return eventsPerSecond;
-  }
+  private static final Logger LOG = new Logger(SyntheticStreamGenerator.class);
 
+  private final EventSerializer serializer;
   private final int eventsPerSecond;
 
   // When calculating rates, leave this buffer to minimize overruns where we're still writing messages from the previous
@@ -52,13 +38,14 @@ public abstract class SyntheticStreamGenerator implements StreamGenerator
   // second to begin.
   private final long cyclePaddingMs;
 
-  public SyntheticStreamGenerator(int eventsPerSecond, long cyclePaddingMs)
+  public SyntheticStreamGenerator(EventSerializer serializer, int eventsPerSecond, long cyclePaddingMs)
   {
+    this.serializer = serializer;
     this.eventsPerSecond = eventsPerSecond;
     this.cyclePaddingMs = cyclePaddingMs;
   }
 
-  abstract Object getEvent(int row, DateTime timestamp);
+  abstract List<Pair<String, Object>> newEvent(int row, DateTime timestamp);
 
   @Override
   public void run(String streamTopic, StreamEventWriter streamEventWriter, int totalNumberOfSeconds)
@@ -83,12 +70,12 @@ public abstract class SyntheticStreamGenerator implements StreamGenerator
       try {
         long sleepMillis = nowCeilingToSecond.getMillis() - DateTimes.nowUtc().getMillis();
         if (sleepMillis > 0) {
-          log.info("Waiting %s ms for next run cycle (at %s)", sleepMillis, nowCeilingToSecond);
+          LOG.info("Waiting %s ms for next run cycle (at %s)", sleepMillis, nowCeilingToSecond);
           Thread.sleep(sleepMillis);
           continue;
         }
 
-        log.info(
+        LOG.info(
             "Beginning run cycle with %s events, target completion time: %s",
             eventsPerSecond,
             nowCeilingToSecond.plusSeconds(1).minus(cyclePaddingMs)
@@ -99,11 +86,11 @@ public abstract class SyntheticStreamGenerator implements StreamGenerator
         }
 
         for (int i = 1; i <= eventsPerSecond; i++) {
-          streamEventWriter.write(streamTopic, MAPPER.writeValueAsString(getEvent(i, eventTimestamp)));
+          streamEventWriter.write(streamTopic, serializer.serialize(newEvent(i, eventTimestamp)));
 
           long sleepTime = calculateSleepTimeMs(eventsPerSecond - i, nowCeilingToSecond);
           if ((i <= 100 && i % 10 == 0) || i % 100 == 0) {
-            log.info("Event: %s/%s, sleep time: %s ms", i, eventsPerSecond, sleepTime);
+            LOG.info("Event: %s/%s, sleep time: %s ms", i, eventsPerSecond, sleepTime);
           }
 
           if (sleepTime > 0) {
@@ -119,7 +106,7 @@ public abstract class SyntheticStreamGenerator implements StreamGenerator
         eventTimestamp = eventTimestamp.plusSeconds(1);
         seconds++;
 
-        log.info(
+        LOG.info(
             "Finished writing %s events, current time: %s - updating next timestamp to: %s",
             eventsPerSecond,
             DateTimes.nowUtc(),
@@ -128,7 +115,7 @@ public abstract class SyntheticStreamGenerator implements StreamGenerator
 
         if (seconds >= totalNumberOfSeconds) {
           streamEventWriter.flush();
-          log.info(
+          LOG.info(
               "Finished writing %s seconds",
               seconds
           );
@@ -141,11 +128,6 @@ public abstract class SyntheticStreamGenerator implements StreamGenerator
     }
   }
 
-  @Override
-  public void shutdown()
-  {
-  }
-
   /**
    * Dynamically adjust delay between messages to spread them out over the remaining time left in the second.
    */
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/WikipediaStreamEventStreamGenerator.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/WikipediaStreamEventStreamGenerator.java
index 4fea67d..075e77b 100644
--- a/integration-tests/src/main/java/org/apache/druid/testing/utils/WikipediaStreamEventStreamGenerator.java
+++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/WikipediaStreamEventStreamGenerator.java
@@ -19,42 +19,44 @@
 
 package org.apache.druid.testing.utils;
 
+import org.apache.druid.java.util.common.Pair;
 import org.joda.time.DateTime;
 import org.joda.time.format.DateTimeFormat;
 import org.joda.time.format.DateTimeFormatter;
 
-import java.util.HashMap;
-import java.util.Map;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
 
 public class WikipediaStreamEventStreamGenerator extends SyntheticStreamGenerator
 {
   private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss'Z'");
 
-  public WikipediaStreamEventStreamGenerator(int eventsPerSeconds, long cyclePaddingMs)
+  public WikipediaStreamEventStreamGenerator(EventSerializer serializer, int eventsPerSeconds, long cyclePaddingMs)
   {
-    super(eventsPerSeconds, cyclePaddingMs);
+    super(serializer, eventsPerSeconds, cyclePaddingMs);
   }
 
   @Override
-  Object getEvent(int i, DateTime timestamp)
+  List<Pair<String, Object>> newEvent(int i, DateTime timestamp)
   {
-    Map<String, Object> event = new HashMap<>();
-    event.put("page", "Gypsy Danger");
-    event.put("language", "en");
-    event.put("user", "nuclear");
-    event.put("unpatrolled", "true");
-    event.put("newPage", "true");
-    event.put("robot", "false");
-    event.put("anonymous", "false");
-    event.put("namespace", "article");
-    event.put("continent", "North America");
-    event.put("country", "United States");
-    event.put("region", "Bay Area");
-    event.put("city", "San Francisco");
-    event.put("timestamp", DATE_TIME_FORMATTER.print(timestamp));
-    event.put("added", i);
-    event.put("deleted", 0);
-    event.put("delta", i);
-    return event;
+    List<Pair<String, Object>> event = new ArrayList<>();
+    event.add(Pair.of("timestamp", DATE_TIME_FORMATTER.print(timestamp)));
+    event.add(Pair.of("page", "Gypsy Danger"));
+    event.add(Pair.of("language", "en"));
+    event.add(Pair.of("user", "nuclear"));
+    event.add(Pair.of("unpatrolled", "true"));
+    event.add(Pair.of("newPage", "true"));
+    event.add(Pair.of("robot", "false"));
+    event.add(Pair.of("anonymous", "false"));
+    event.add(Pair.of("namespace", "article"));
+    event.add(Pair.of("continent", "North America"));
+    event.add(Pair.of("country", "United States"));
+    event.add(Pair.of("region", "Bay Area"));
+    event.add(Pair.of("city", "San Francisco"));
+    event.add(Pair.of("added", i));
+    event.add(Pair.of("deleted", 0));
+    event.add(Pair.of("delta", i));
+    return Collections.unmodifiableList(event);
   }
 }
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java b/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java
index 9d8b7fe..c4c6864 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java
@@ -37,6 +37,8 @@ public class TestNGGroup
 
   public static final String TRANSACTIONAL_KAFKA_INDEX_SLOW = "kafka-transactional-index-slow";
 
+  public static final String KAFKA_DATA_FORMAT = "kafka-data-format";
+
   public static final String OTHER_INDEX = "other-index";
 
   public static final String PERFECT_ROLLUP_PARALLEL_BATCH_INDEX = "perfect-rollup-parallel-batch-index";
@@ -110,4 +112,12 @@ public class TestNGGroup
    * Kinesis stream endpoint for a region must also be pass to mvn with -Ddruid.test.config.streamEndpoint=<ENDPOINT>
    */
   public static final String KINESIS_INDEX = "kinesis-index";
+
+  /**
+   * This group is not part of CI. To run this group, AWS kinesis configs/credentials for your AWS kinesis must be
+   * provided in a file. The path of the file must then be pass to mvn with -Doverride.config.path=<PATH_TO_FILE>
+   * See integration-tests/docker/environment-configs/override-examples/kinesis for env vars to provide.
+   * Kinesis stream endpoint for a region must also be pass to mvn with -Ddruid.test.config.streamEndpoint=<ENDPOINT>
+   */
+  public static final String KINESIS_DATA_FORMAT = "kinesis-data-format";
 }
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java
index 6811c32..afe5bbf 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java
@@ -33,17 +33,19 @@ import org.apache.druid.testing.utils.ITRetryUtil;
 import org.apache.druid.testing.utils.TestQueryHelper;
 import org.joda.time.Interval;
 
+import java.io.BufferedReader;
 import java.io.Closeable;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.InputStreamReader;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.Callable;
 
 public abstract class AbstractIndexerTest
 {
-
   @Inject
   protected CoordinatorResourceTestClient coordinator;
   @Inject
@@ -109,15 +111,33 @@ public abstract class AbstractIndexerTest
     );
   }
 
-  protected String getResourceAsString(String file) throws IOException
+  public static String getResourceAsString(String file) throws IOException
   {
-    final InputStream inputStream = ITRealtimeIndexTaskTest.class.getResourceAsStream(file);
-    try {
+    try (final InputStream inputStream = getResourceAsStream(file)) {
       return IOUtils.toString(inputStream, StandardCharsets.UTF_8);
     }
-    finally {
-      IOUtils.closeQuietly(inputStream);
-    }
   }
 
+  public static InputStream getResourceAsStream(String resource)
+  {
+    return ITRealtimeIndexTaskTest.class.getResourceAsStream(resource);
+  }
+
+  public static List<String> listResources(String dir) throws IOException
+  {
+    List<String> resources = new ArrayList<>();
+
+    try (
+        InputStream in = getResourceAsStream(dir);
+        BufferedReader br = new BufferedReader(new InputStreamReader(in, StringUtils.UTF8_STRING))
+    ) {
+      String resource;
+
+      while ((resource = br.readLine()) != null) {
+        resources.add(resource);
+      }
+    }
+
+    return resources;
+  }
 }
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java
index ce769bf..3bb9693 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java
@@ -33,8 +33,6 @@ import java.util.function.Function;
 
 public abstract class AbstractKafkaIndexingServiceTest extends AbstractStreamIndexingTest
 {
-  protected abstract boolean isKafkaWriterTransactionalEnabled();
-
   @Override
   StreamAdminClient createStreamAdminClient(IntegrationTestingConfig config)
   {
@@ -42,15 +40,19 @@ public abstract class AbstractKafkaIndexingServiceTest extends AbstractStreamInd
   }
 
   @Override
-  public StreamEventWriter createStreamEventWriter(IntegrationTestingConfig config)
+  public StreamEventWriter createStreamEventWriter(IntegrationTestingConfig config, boolean transactionEnabled)
   {
-    return new KafkaEventWriter(config, isKafkaWriterTransactionalEnabled());
+    return new KafkaEventWriter(config, transactionEnabled);
   }
 
   @Override
-  Function<String, String> generateStreamIngestionPropsTransform(String streamName,
-                                                                 String fullDatasourceName,
-                                                                 IntegrationTestingConfig config)
+  Function<String, String> generateStreamIngestionPropsTransform(
+      String streamName,
+      String fullDatasourceName,
+      String parserType,
+      String parserOrInputFormat,
+      IntegrationTestingConfig config
+  )
   {
     final Map<String, Object> consumerConfigs = KafkaConsumerConfigs.getConsumerProperties();
     final Properties consumerProperties = new Properties();
@@ -78,6 +80,29 @@ public abstract class AbstractKafkaIndexingServiceTest extends AbstractStreamInd
             "%%TOPIC_VALUE%%",
             streamName
         );
+        if (AbstractStreamIndexingTest.INPUT_FORMAT.equals(parserType)) {
+          spec = StringUtils.replace(
+              spec,
+              "%%INPUT_FORMAT%%",
+              parserOrInputFormat
+          );
+          spec = StringUtils.replace(
+              spec,
+              "%%PARSER%%",
+              "null"
+          );
+        } else if (AbstractStreamIndexingTest.INPUT_ROW_PARSER.equals(parserType)) {
+          spec = StringUtils.replace(
+              spec,
+              "%%PARSER%%",
+              parserOrInputFormat
+          );
+          spec = StringUtils.replace(
+              spec,
+              "%%INPUT_FORMAT%%",
+              "null"
+          );
+        }
         spec = StringUtils.replace(
             spec,
             "%%USE_EARLIEST_KEY%%",
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKinesisIndexingServiceTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKinesisIndexingServiceTest.java
index 14c9cac..b8095a3 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKinesisIndexingServiceTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKinesisIndexingServiceTest.java
@@ -37,15 +37,20 @@ public abstract class AbstractKinesisIndexingServiceTest extends AbstractStreamI
   }
 
   @Override
-  StreamEventWriter createStreamEventWriter(IntegrationTestingConfig config) throws Exception
+  StreamEventWriter createStreamEventWriter(IntegrationTestingConfig config, boolean transactionEnabled)
+      throws Exception
   {
     return new KinesisEventWriter(config.getStreamEndpoint(), false);
   }
 
   @Override
-  Function<String, String> generateStreamIngestionPropsTransform(String streamName,
-                                                                 String fullDatasourceName,
-                                                                 IntegrationTestingConfig config)
+  Function<String, String> generateStreamIngestionPropsTransform(
+      String streamName,
+      String fullDatasourceName,
+      String parserType,
+      String parserOrInputFormat,
+      IntegrationTestingConfig config
+  )
   {
     return spec -> {
       try {
@@ -69,6 +74,29 @@ public abstract class AbstractKinesisIndexingServiceTest extends AbstractStreamI
             "%%TOPIC_VALUE%%",
             streamName
         );
+        if (AbstractStreamIndexingTest.INPUT_FORMAT.equals(parserType)) {
+          spec = StringUtils.replace(
+              spec,
+              "%%INPUT_FORMAT%%",
+              parserOrInputFormat
+          );
+          spec = StringUtils.replace(
+              spec,
+              "%%PARSER%%",
+              "null"
+          );
+        } else if (AbstractStreamIndexingTest.INPUT_ROW_PARSER.equals(parserType)) {
+          spec = StringUtils.replace(
+              spec,
+              "%%PARSER%%",
+              parserOrInputFormat
+          );
+          spec = StringUtils.replace(
+              spec,
+              "%%INPUT_FORMAT%%",
+              "null"
+          );
+        }
         spec = StringUtils.replace(
             spec,
             "%%USE_EARLIEST_KEY%%",
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
index 2f0c65a..506e80f 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
@@ -20,24 +20,34 @@
 package org.apache.druid.tests.indexer;
 
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
 import com.google.inject.Inject;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
 import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.testing.IntegrationTestingConfig;
 import org.apache.druid.testing.utils.DruidClusterAdminClient;
+import org.apache.druid.testing.utils.EventSerializer;
 import org.apache.druid.testing.utils.ITRetryUtil;
+import org.apache.druid.testing.utils.JsonEventSerializer;
 import org.apache.druid.testing.utils.StreamAdminClient;
 import org.apache.druid.testing.utils.StreamEventWriter;
+import org.apache.druid.testing.utils.StreamGenerator;
 import org.apache.druid.testing.utils.WikipediaStreamEventStreamGenerator;
 import org.joda.time.DateTime;
 import org.joda.time.format.DateTimeFormat;
 import org.joda.time.format.DateTimeFormatter;
 
 import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 
 public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
 {
@@ -48,17 +58,32 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
   static final DateTimeFormatter TIMESTAMP_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss'.000Z'");
   static final int EVENTS_PER_SECOND = 6;
   static final int TOTAL_NUMBER_OF_SECOND = 10;
+
+  private static final Logger LOG = new Logger(AbstractStreamIndexingTest.class);
   // Since this integration test can terminates or be killed un-expectedly, this tag is added to all streams created
   // to help make stream clean up easier. (Normally, streams should be cleanup automattically by the teardown method)
   // The value to this tag is a timestamp that can be used by a lambda function to remove unused stream.
   private static final String STREAM_EXPIRE_TAG = "druid-ci-expire-after";
   private static final int STREAM_SHARD_COUNT = 2;
   private static final long WAIT_TIME_MILLIS = 3 * 60 * 1000L;
-  private static final String INDEXER_FILE_LEGACY_PARSER = "/indexer/stream_supervisor_spec_legacy_parser.json";
-  private static final String INDEXER_FILE_INPUT_FORMAT = "/indexer/stream_supervisor_spec_input_format.json";
-  private static final String QUERIES_FILE = "/indexer/stream_index_queries.json";
   private static final long CYCLE_PADDING_MS = 100;
-  private static final Logger LOG = new Logger(AbstractStreamIndexingTest.class);
+
+  private static final String QUERIES_FILE = "/stream/queries/stream_index_queries.json";
+  private static final String SUPERVISOR_SPEC_TEMPLATE_FILE = "supervisor_spec_template.json";
+
+  protected static final String DATA_RESOURCE_ROOT = "/stream/data";
+  protected static final String SUPERVISOR_SPEC_TEMPLATE_PATH =
+      String.join("/", DATA_RESOURCE_ROOT, SUPERVISOR_SPEC_TEMPLATE_FILE);
+  protected static final String SERIALIZER_SPEC_DIR = "serializer";
+  protected static final String INPUT_FORMAT_SPEC_DIR = "input_format";
+  protected static final String INPUT_ROW_PARSER_SPEC_DIR = "parser";
+
+  protected static final String SERIALIZER = "serializer";
+  protected static final String INPUT_FORMAT = "inputFormat";
+  protected static final String INPUT_ROW_PARSER = "parser";
+
+  private static final String JSON_INPUT_FORMAT_PATH =
+      String.join("/", DATA_RESOURCE_ROOT, "json", INPUT_FORMAT_SPEC_DIR, "input_format.json");
 
   @Inject
   private DruidClusterAdminClient druidClusterAdminClient;
@@ -67,92 +92,147 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
   private IntegrationTestingConfig config;
 
   private StreamAdminClient streamAdminClient;
-  private WikipediaStreamEventStreamGenerator wikipediaStreamEventGenerator;
 
   abstract StreamAdminClient createStreamAdminClient(IntegrationTestingConfig config) throws Exception;
-  abstract StreamEventWriter createStreamEventWriter(IntegrationTestingConfig config) throws Exception;
-  abstract Function<String, String> generateStreamIngestionPropsTransform(String streamName,
-                                                                          String fullDatasourceName,
-                                                                          IntegrationTestingConfig config);
+
+  abstract StreamEventWriter createStreamEventWriter(IntegrationTestingConfig config, boolean transactionEnabled)
+      throws Exception;
+
+  abstract Function<String, String> generateStreamIngestionPropsTransform(
+      String streamName,
+      String fullDatasourceName,
+      String parserType,
+      String parserOrInputFormat,
+      IntegrationTestingConfig config
+  );
+
   abstract Function<String, String> generateStreamQueryPropsTransform(String streamName, String fullDatasourceName);
+
   public abstract String getTestNamePrefix();
 
   protected void doBeforeClass() throws Exception
   {
     streamAdminClient = createStreamAdminClient(config);
-    wikipediaStreamEventGenerator = new WikipediaStreamEventStreamGenerator(EVENTS_PER_SECOND, CYCLE_PADDING_MS);
   }
 
-  protected void doClassTeardown()
+  private static String getOnlyResourcePath(String resourceRoot) throws IOException
   {
-    wikipediaStreamEventGenerator.shutdown();
+    return String.join("/", resourceRoot, Iterables.getOnlyElement(listResources(resourceRoot)));
   }
 
-  protected void doTestIndexDataWithLegacyParserStableState() throws Exception
+  protected static List<String> listDataFormatResources() throws IOException
   {
-    StreamEventWriter streamEventWriter = createStreamEventWriter(config);
-    final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig();
-    try (
-        final Closeable ignored1 = unloader(generatedTestConfig.getFullDatasourceName())
-    ) {
-      final String taskSpec = generatedTestConfig.getStreamIngestionPropsTransform().apply(getResourceAsString(INDEXER_FILE_LEGACY_PARSER));
-      LOG.info("supervisorSpec: [%s]\n", taskSpec);
-      // Start supervisor
-      generatedTestConfig.setSupervisorId(indexer.submitSupervisor(taskSpec));
-      LOG.info("Submitted supervisor");
-      // Start data generator
-      wikipediaStreamEventGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, TOTAL_NUMBER_OF_SECOND, FIRST_EVENT_TIME);
-      verifyIngestedData(generatedTestConfig);
+    return listResources(DATA_RESOURCE_ROOT)
+        .stream()
+        .filter(resource -> !SUPERVISOR_SPEC_TEMPLATE_FILE.equals(resource))
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * Returns a map of key to path to spec. The returned map contains at least 2 specs and one of them
+   * should be a {@link #SERIALIZER} spec.
+   */
+  protected static Map<String, String> findTestSpecs(String resourceRoot) throws IOException
+  {
+    final List<String> specDirs = listResources(resourceRoot);
+    final Map<String, String> map = new HashMap<>();
+    for (String eachSpec : specDirs) {
+      if (SERIALIZER_SPEC_DIR.equals(eachSpec)) {
+        map.put(SERIALIZER, getOnlyResourcePath(String.join("/", resourceRoot, SERIALIZER_SPEC_DIR)));
+      } else if (INPUT_ROW_PARSER_SPEC_DIR.equals(eachSpec)) {
+        map.put(INPUT_ROW_PARSER, getOnlyResourcePath(String.join("/", resourceRoot, INPUT_ROW_PARSER_SPEC_DIR)));
+      } else if (INPUT_FORMAT_SPEC_DIR.equals(eachSpec)) {
+        map.put(INPUT_FORMAT, getOnlyResourcePath(String.join("/", resourceRoot, INPUT_FORMAT_SPEC_DIR)));
+      }
     }
-    finally {
-      doMethodTeardown(generatedTestConfig, streamEventWriter);
+    if (!map.containsKey(SERIALIZER_SPEC_DIR)) {
+      throw new IAE("Failed to find serializer spec under [%s]. Found resources are %s", resourceRoot, map);
     }
+    if (map.size() == 1) {
+      throw new IAE("Failed to find input format or parser spec under [%s]. Found resources are %s", resourceRoot, map);
+    }
+    return map;
+  }
+
+  private Closeable createResourceCloser(GeneratedTestConfig generatedTestConfig)
+  {
+    return Closer.create().register(() -> doMethodTeardown(generatedTestConfig));
   }
 
-  protected void doTestIndexDataWithInputFormatStableState() throws Exception
+  protected void doTestIndexDataStableState(
+      boolean transactionEnabled,
+      String serializerPath,
+      String parserType,
+      String specPath
+  ) throws Exception
   {
-    StreamEventWriter streamEventWriter = createStreamEventWriter(config);
-    final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig();
+    final EventSerializer serializer = jsonMapper.readValue(getResourceAsStream(serializerPath), EventSerializer.class);
+    final StreamGenerator streamGenerator = new WikipediaStreamEventStreamGenerator(
+        serializer,
+        EVENTS_PER_SECOND,
+        CYCLE_PADDING_MS
+    );
+    final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig(parserType, getResourceAsString(specPath));
     try (
-        final Closeable ignored1 = unloader(generatedTestConfig.getFullDatasourceName())
+        final Closeable closer = createResourceCloser(generatedTestConfig);
+        final StreamEventWriter streamEventWriter = createStreamEventWriter(config, transactionEnabled)
     ) {
-      final String taskSpec = generatedTestConfig.getStreamIngestionPropsTransform().apply(getResourceAsString(INDEXER_FILE_INPUT_FORMAT));
+      final String taskSpec = generatedTestConfig.getStreamIngestionPropsTransform()
+                                                 .apply(getResourceAsString(SUPERVISOR_SPEC_TEMPLATE_PATH));
       LOG.info("supervisorSpec: [%s]\n", taskSpec);
       // Start supervisor
       generatedTestConfig.setSupervisorId(indexer.submitSupervisor(taskSpec));
       LOG.info("Submitted supervisor");
       // Start data generator
-      wikipediaStreamEventGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, TOTAL_NUMBER_OF_SECOND, FIRST_EVENT_TIME);
+      streamGenerator.run(
+          generatedTestConfig.getStreamName(),
+          streamEventWriter,
+          TOTAL_NUMBER_OF_SECOND,
+          FIRST_EVENT_TIME
+      );
       verifyIngestedData(generatedTestConfig);
     }
-    finally {
-      doMethodTeardown(generatedTestConfig, streamEventWriter);
-    }
   }
 
-  void doTestIndexDataWithLosingCoordinator() throws Exception
+  void doTestIndexDataWithLosingCoordinator(boolean transactionEnabled) throws Exception
   {
-    testIndexWithLosingNodeHelper(() -> druidClusterAdminClient.restartCoordinatorContainer(), () -> druidClusterAdminClient.waitUntilCoordinatorReady());
+    testIndexWithLosingNodeHelper(
+        () -> druidClusterAdminClient.restartCoordinatorContainer(),
+        () -> druidClusterAdminClient.waitUntilCoordinatorReady(),
+        transactionEnabled
+    );
   }
 
-  void doTestIndexDataWithLosingOverlord() throws Exception
+  void doTestIndexDataWithLosingOverlord(boolean transactionEnabled) throws Exception
   {
-    testIndexWithLosingNodeHelper(() -> druidClusterAdminClient.restartIndexerContainer(), () -> druidClusterAdminClient.waitUntilIndexerReady());
+    testIndexWithLosingNodeHelper(
+        () -> druidClusterAdminClient.restartIndexerContainer(),
+        () -> druidClusterAdminClient.waitUntilIndexerReady(),
+        transactionEnabled
+    );
   }
 
-  void doTestIndexDataWithLosingHistorical() throws Exception
+  void doTestIndexDataWithLosingHistorical(boolean transactionEnabled) throws Exception
   {
-    testIndexWithLosingNodeHelper(() -> druidClusterAdminClient.restartHistoricalContainer(), () -> druidClusterAdminClient.waitUntilHistoricalReady());
+    testIndexWithLosingNodeHelper(
+        () -> druidClusterAdminClient.restartHistoricalContainer(),
+        () -> druidClusterAdminClient.waitUntilHistoricalReady(),
+        transactionEnabled
+    );
   }
 
-  protected void doTestIndexDataWithStartStopSupervisor() throws Exception
+  protected void doTestIndexDataWithStartStopSupervisor(boolean transactionEnabled) throws Exception
   {
-    StreamEventWriter streamEventWriter = createStreamEventWriter(config);
-    final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig();
+    final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig(
+        INPUT_FORMAT,
+        getResourceAsString(JSON_INPUT_FORMAT_PATH)
+    );
     try (
-        final Closeable ignored1 = unloader(generatedTestConfig.getFullDatasourceName())
+        final Closeable closer = createResourceCloser(generatedTestConfig);
+        final StreamEventWriter streamEventWriter = createStreamEventWriter(config, transactionEnabled)
     ) {
-      final String taskSpec = generatedTestConfig.getStreamIngestionPropsTransform().apply(getResourceAsString(INDEXER_FILE_INPUT_FORMAT));
+      final String taskSpec = generatedTestConfig.getStreamIngestionPropsTransform()
+                                                 .apply(getResourceAsString(SUPERVISOR_SPEC_TEMPLATE_PATH));
       LOG.info("supervisorSpec: [%s]\n", taskSpec);
       // Start supervisor
       generatedTestConfig.setSupervisorId(indexer.submitSupervisor(taskSpec));
@@ -161,7 +241,17 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
       int secondsToGenerateRemaining = TOTAL_NUMBER_OF_SECOND;
       int secondsToGenerateFirstRound = TOTAL_NUMBER_OF_SECOND / 2;
       secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateFirstRound;
-      wikipediaStreamEventGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, secondsToGenerateFirstRound, FIRST_EVENT_TIME);
+      final StreamGenerator streamGenerator = new WikipediaStreamEventStreamGenerator(
+          new JsonEventSerializer(jsonMapper),
+          EVENTS_PER_SECOND,
+          CYCLE_PADDING_MS
+      );
+      streamGenerator.run(
+          generatedTestConfig.getStreamName(),
+          streamEventWriter,
+          secondsToGenerateFirstRound,
+          FIRST_EVENT_TIME
+      );
       // Verify supervisor is healthy before suspension
       ITRetryUtil.retryUntil(
           () -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(generatedTestConfig.getSupervisorId())),
@@ -173,7 +263,12 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
       // Suspend the supervisor
       indexer.suspendSupervisor(generatedTestConfig.getSupervisorId());
       // Start generating remainning half of the data
-      wikipediaStreamEventGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, secondsToGenerateRemaining, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound));
+      streamGenerator.run(
+          generatedTestConfig.getStreamName(),
+          streamEventWriter,
+          secondsToGenerateRemaining,
+          FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound)
+      );
       // Resume the supervisor
       indexer.resumeSupervisor(generatedTestConfig.getSupervisorId());
       // Verify supervisor is healthy after suspension
@@ -187,31 +282,36 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
       // Verify that supervisor can catch up with the stream
       verifyIngestedData(generatedTestConfig);
     }
-    finally {
-      doMethodTeardown(generatedTestConfig, streamEventWriter);
-    }
   }
 
-  protected void doTestIndexDataWithStreamReshardSplit() throws Exception
+  protected void doTestIndexDataWithStreamReshardSplit(boolean transactionEnabled) throws Exception
   {
     // Reshard the stream from STREAM_SHARD_COUNT to STREAM_SHARD_COUNT * 2
-    testIndexWithStreamReshardHelper(STREAM_SHARD_COUNT * 2);
+    testIndexWithStreamReshardHelper(transactionEnabled, STREAM_SHARD_COUNT * 2);
   }
 
-  protected void doTestIndexDataWithStreamReshardMerge() throws Exception
+  protected void doTestIndexDataWithStreamReshardMerge(boolean transactionEnabled) throws Exception
   {
     // Reshard the stream from STREAM_SHARD_COUNT to STREAM_SHARD_COUNT / 2
-    testIndexWithStreamReshardHelper(STREAM_SHARD_COUNT / 2);
+    testIndexWithStreamReshardHelper(transactionEnabled, STREAM_SHARD_COUNT / 2);
   }
 
-  private void testIndexWithLosingNodeHelper(Runnable restartRunnable, Runnable waitForReadyRunnable) throws Exception
+  private void testIndexWithLosingNodeHelper(
+      Runnable restartRunnable,
+      Runnable waitForReadyRunnable,
+      boolean transactionEnabled
+  ) throws Exception
   {
-    StreamEventWriter streamEventWriter = createStreamEventWriter(config);
-    final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig();
+    final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig(
+        INPUT_FORMAT,
+        getResourceAsString(JSON_INPUT_FORMAT_PATH)
+    );
     try (
-        final Closeable ignored1 = unloader(generatedTestConfig.getFullDatasourceName())
+        final Closeable closer = createResourceCloser(generatedTestConfig);
+        final StreamEventWriter streamEventWriter = createStreamEventWriter(config, transactionEnabled)
     ) {
-      final String taskSpec = generatedTestConfig.getStreamIngestionPropsTransform().apply(getResourceAsString(INDEXER_FILE_INPUT_FORMAT));
+      final String taskSpec = generatedTestConfig.getStreamIngestionPropsTransform()
+                                                 .apply(getResourceAsString(SUPERVISOR_SPEC_TEMPLATE_PATH));
       LOG.info("supervisorSpec: [%s]\n", taskSpec);
       // Start supervisor
       generatedTestConfig.setSupervisorId(indexer.submitSupervisor(taskSpec));
@@ -220,7 +320,17 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
       int secondsToGenerateRemaining = TOTAL_NUMBER_OF_SECOND;
       int secondsToGenerateFirstRound = TOTAL_NUMBER_OF_SECOND / 3;
       secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateFirstRound;
-      wikipediaStreamEventGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, secondsToGenerateFirstRound, FIRST_EVENT_TIME);
+      final StreamGenerator streamGenerator = new WikipediaStreamEventStreamGenerator(
+          new JsonEventSerializer(jsonMapper),
+          EVENTS_PER_SECOND,
+          CYCLE_PADDING_MS
+      );
+      streamGenerator.run(
+          generatedTestConfig.getStreamName(),
+          streamEventWriter,
+          secondsToGenerateFirstRound,
+          FIRST_EVENT_TIME
+      );
       // Verify supervisor is healthy before restart
       ITRetryUtil.retryUntil(
           () -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(generatedTestConfig.getSupervisorId())),
@@ -236,13 +346,23 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
       // Start generating one third of the data (while restarting)
       int secondsToGenerateSecondRound = TOTAL_NUMBER_OF_SECOND / 3;
       secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateSecondRound;
-      wikipediaStreamEventGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, secondsToGenerateSecondRound, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound));
+      streamGenerator.run(
+          generatedTestConfig.getStreamName(),
+          streamEventWriter,
+          secondsToGenerateSecondRound,
+          FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound)
+      );
       // Wait for Druid process to be available
       LOG.info("Waiting for Druid process to be available");
       waitForReadyRunnable.run();
       LOG.info("Druid process is now available");
       // Start generating remaining data (after restarting)
-      wikipediaStreamEventGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, secondsToGenerateRemaining, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound + secondsToGenerateSecondRound));
+      streamGenerator.run(
+          generatedTestConfig.getStreamName(),
+          streamEventWriter,
+          secondsToGenerateRemaining,
+          FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound + secondsToGenerateSecondRound)
+      );
       // Verify supervisor is healthy
       ITRetryUtil.retryUntil(
           () -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(generatedTestConfig.getSupervisorId())),
@@ -254,19 +374,20 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
       // Verify that supervisor ingested all data
       verifyIngestedData(generatedTestConfig);
     }
-    finally {
-      doMethodTeardown(generatedTestConfig, streamEventWriter);
-    }
   }
 
-  private void testIndexWithStreamReshardHelper(int newShardCount) throws Exception
+  private void testIndexWithStreamReshardHelper(boolean transactionEnabled, int newShardCount) throws Exception
   {
-    StreamEventWriter streamEventWriter = createStreamEventWriter(config);
-    final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig();
+    final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig(
+        INPUT_FORMAT,
+        getResourceAsString(JSON_INPUT_FORMAT_PATH)
+    );
     try (
-        final Closeable ignored1 = unloader(generatedTestConfig.getFullDatasourceName())
+        final Closeable closer = createResourceCloser(generatedTestConfig);
+        final StreamEventWriter streamEventWriter = createStreamEventWriter(config, transactionEnabled)
     ) {
-      final String taskSpec = generatedTestConfig.getStreamIngestionPropsTransform().apply(getResourceAsString(INDEXER_FILE_INPUT_FORMAT));
+      final String taskSpec = generatedTestConfig.getStreamIngestionPropsTransform()
+                                                 .apply(getResourceAsString(SUPERVISOR_SPEC_TEMPLATE_PATH));
       LOG.info("supervisorSpec: [%s]\n", taskSpec);
       // Start supervisor
       generatedTestConfig.setSupervisorId(indexer.submitSupervisor(taskSpec));
@@ -275,7 +396,17 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
       int secondsToGenerateRemaining = TOTAL_NUMBER_OF_SECOND;
       int secondsToGenerateFirstRound = TOTAL_NUMBER_OF_SECOND / 3;
       secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateFirstRound;
-      wikipediaStreamEventGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, secondsToGenerateFirstRound, FIRST_EVENT_TIME);
+      final StreamGenerator streamGenerator = new WikipediaStreamEventStreamGenerator(
+          new JsonEventSerializer(jsonMapper),
+          EVENTS_PER_SECOND,
+          CYCLE_PADDING_MS
+      );
+      streamGenerator.run(
+          generatedTestConfig.getStreamName(),
+          streamEventWriter,
+          secondsToGenerateFirstRound,
+          FIRST_EVENT_TIME
+      );
       // Verify supervisor is healthy before resahrding
       ITRetryUtil.retryUntil(
           () -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(generatedTestConfig.getSupervisorId())),
@@ -289,7 +420,12 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
       // Start generating one third of the data (while resharding)
       int secondsToGenerateSecondRound = TOTAL_NUMBER_OF_SECOND / 3;
       secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateSecondRound;
-      wikipediaStreamEventGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, secondsToGenerateSecondRound, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound));
+      streamGenerator.run(
+          generatedTestConfig.getStreamName(),
+          streamEventWriter,
+          secondsToGenerateSecondRound,
+          FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound)
+      );
       // Wait for stream to finish resharding
       ITRetryUtil.retryUntil(
           () -> streamAdminClient.isStreamActive(generatedTestConfig.getStreamName()),
@@ -299,14 +435,23 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
           "Waiting for stream to finish resharding"
       );
       ITRetryUtil.retryUntil(
-          () -> streamAdminClient.verfiyPartitionCountUpdated(generatedTestConfig.getStreamName(), STREAM_SHARD_COUNT, newShardCount),
+          () -> streamAdminClient.verfiyPartitionCountUpdated(
+              generatedTestConfig.getStreamName(),
+              STREAM_SHARD_COUNT,
+              newShardCount
+          ),
           true,
           10000,
           30,
           "Waiting for stream to finish resharding"
       );
       // Start generating remaining data (after resharding)
-      wikipediaStreamEventGenerator.run(generatedTestConfig.getStreamName(), streamEventWriter, secondsToGenerateRemaining, FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound + secondsToGenerateSecondRound));
+      streamGenerator.run(
+          generatedTestConfig.getStreamName(),
+          streamEventWriter,
+          secondsToGenerateRemaining,
+          FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound + secondsToGenerateSecondRound)
+      );
       // Verify supervisor is healthy after resahrding
       ITRetryUtil.retryUntil(
           () -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(generatedTestConfig.getSupervisorId())),
@@ -318,9 +463,6 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
       // Verify that supervisor can catch up with the stream
       verifyIngestedData(generatedTestConfig);
     }
-    finally {
-      doMethodTeardown(generatedTestConfig, streamEventWriter);
-    }
   }
 
   private void verifyIngestedData(GeneratedTestConfig generatedTestConfig) throws Exception
@@ -329,11 +471,14 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
     LOG.info("Waiting for [%s] millis for stream indexing tasks to consume events", WAIT_TIME_MILLIS);
     Thread.sleep(WAIT_TIME_MILLIS);
     // Query data
-    final String querySpec = generatedTestConfig.getStreamQueryPropsTransform().apply(getResourceAsString(QUERIES_FILE));
+    final String querySpec = generatedTestConfig.getStreamQueryPropsTransform()
+                                                .apply(getResourceAsString(QUERIES_FILE));
     // this query will probably be answered from the indexing tasks but possibly from 2 historical segments / 2 indexing
     this.queryHelper.testQueriesFromString(querySpec, 2);
     LOG.info("Shutting down supervisor");
     indexer.shutdownSupervisor(generatedTestConfig.getSupervisorId());
+    // Clear supervisor ID to not shutdown again.
+    generatedTestConfig.setSupervisorId(null);
     // wait for all indexing tasks to finish
     LOG.info("Waiting for all indexing tasks to finish");
     ITRetryUtil.retryUntilTrue(
@@ -358,22 +503,16 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
     return (numEvents * (1 + numEvents)) / 2;
   }
 
-  private void doMethodTeardown(GeneratedTestConfig generatedTestConfig, StreamEventWriter streamEventWriter)
+  private void doMethodTeardown(GeneratedTestConfig generatedTestConfig)
   {
-    try {
-      streamEventWriter.flush();
-      streamEventWriter.shutdown();
-    }
-    catch (Exception e) {
-      // Best effort cleanup as the writer may have already been cleanup
-      LOG.warn(e, "Failed to cleanup writer. This might be expected depending on the test method");
-    }
-    try {
-      indexer.shutdownSupervisor(generatedTestConfig.getSupervisorId());
-    }
-    catch (Exception e) {
-      // Best effort cleanup as the supervisor may have already been cleanup
-      LOG.warn(e, "Failed to cleanup supervisor. This might be expected depending on the test method");
+    if (generatedTestConfig.getSupervisorId() != null) {
+      try {
+        indexer.shutdownSupervisor(generatedTestConfig.getSupervisorId());
+      }
+      catch (Exception e) {
+        // Best effort cleanup as the supervisor may have already been cleanup
+        LOG.warn(e, "Failed to cleanup supervisor. This might be expected depending on the test method");
+      }
     }
     try {
       unloader(generatedTestConfig.getFullDatasourceName());
@@ -393,17 +532,20 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
 
   private class GeneratedTestConfig
   {
-    private String streamName;
-    private String fullDatasourceName;
+    private final String streamName;
+    private final String fullDatasourceName;
     private String supervisorId;
     private Function<String, String> streamIngestionPropsTransform;
     private Function<String, String> streamQueryPropsTransform;
 
-    GeneratedTestConfig() throws Exception
+    GeneratedTestConfig(String parserType, String parserOrInputFormat) throws Exception
     {
       streamName = getTestNamePrefix() + "_index_test_" + UUID.randomUUID();
       String datasource = getTestNamePrefix() + "_indexing_service_test_" + UUID.randomUUID();
-      Map<String, String> tags = ImmutableMap.of(STREAM_EXPIRE_TAG, Long.toString(DateTimes.nowUtc().plusMinutes(30).getMillis()));
+      Map<String, String> tags = ImmutableMap.of(
+          STREAM_EXPIRE_TAG,
+          Long.toString(DateTimes.nowUtc().plusMinutes(30).getMillis())
+      );
       streamAdminClient.createStream(streamName, STREAM_SHARD_COUNT, tags);
       ITRetryUtil.retryUntil(
           () -> streamAdminClient.isStreamActive(streamName),
@@ -413,7 +555,13 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
           "Wait for stream active"
       );
       fullDatasourceName = datasource + config.getExtraDatasourceNameSuffix();
-      streamIngestionPropsTransform = generateStreamIngestionPropsTransform(streamName, fullDatasourceName, config);
+      streamIngestionPropsTransform = generateStreamIngestionPropsTransform(
+          streamName,
+          fullDatasourceName,
+          parserType,
+          parserOrInputFormat,
+          config
+      );
       streamQueryPropsTransform = generateStreamQueryPropsTransform(streamName, fullDatasourceName);
     }
 
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceNonTransactionalSerializedTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceNonTransactionalSerializedTest.java
index 99713a7..a3b845b 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceNonTransactionalSerializedTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceNonTransactionalSerializedTest.java
@@ -21,7 +21,6 @@ package org.apache.druid.tests.indexer;
 
 import org.apache.druid.testing.guice.DruidTestModuleFactory;
 import org.apache.druid.tests.TestNGGroup;
-import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Guice;
 import org.testng.annotations.Test;
@@ -31,12 +30,6 @@ import org.testng.annotations.Test;
 public class ITKafkaIndexingServiceNonTransactionalSerializedTest extends AbstractKafkaIndexingServiceTest
 {
   @Override
-  protected boolean isKafkaWriterTransactionalEnabled()
-  {
-    return false;
-  }
-
-  @Override
   public String getTestNamePrefix()
   {
     return "kafka_nontransactional_serialized";
@@ -48,19 +41,13 @@ public class ITKafkaIndexingServiceNonTransactionalSerializedTest extends Abstra
     doBeforeClass();
   }
 
-  @AfterClass
-  public void tearDown()
-  {
-    doClassTeardown();
-  }
-
   /**
    * This test must be run individually since the test affect and modify the state of the Druid cluster
    */
   @Test
   public void testKafkaIndexDataWithLosingCoordinator() throws Exception
   {
-    doTestIndexDataWithLosingCoordinator();
+    doTestIndexDataWithLosingCoordinator(false);
   }
 
   /**
@@ -69,7 +56,7 @@ public class ITKafkaIndexingServiceNonTransactionalSerializedTest extends Abstra
   @Test
   public void testKafkaIndexDataWithLosingOverlord() throws Exception
   {
-    doTestIndexDataWithLosingOverlord();
+    doTestIndexDataWithLosingOverlord(false);
   }
 
   /**
@@ -78,6 +65,6 @@ public class ITKafkaIndexingServiceNonTransactionalSerializedTest extends Abstra
   @Test
   public void testKafkaIndexDataWithLosingHistorical() throws Exception
   {
-    doTestIndexDataWithLosingHistorical();
+    doTestIndexDataWithLosingHistorical(false);
   }
 }
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTransactionalSerializedTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTransactionalSerializedTest.java
index 06bcf05..fdd06ff 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTransactionalSerializedTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTransactionalSerializedTest.java
@@ -21,7 +21,6 @@ package org.apache.druid.tests.indexer;
 
 import org.apache.druid.testing.guice.DruidTestModuleFactory;
 import org.apache.druid.tests.TestNGGroup;
-import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Guice;
 import org.testng.annotations.Test;
@@ -31,12 +30,6 @@ import org.testng.annotations.Test;
 public class ITKafkaIndexingServiceTransactionalSerializedTest extends AbstractKafkaIndexingServiceTest
 {
   @Override
-  protected boolean isKafkaWriterTransactionalEnabled()
-  {
-    return true;
-  }
-
-  @Override
   public String getTestNamePrefix()
   {
     return "kafka_transactional_serialized";
@@ -48,19 +41,13 @@ public class ITKafkaIndexingServiceTransactionalSerializedTest extends AbstractK
     doBeforeClass();
   }
 
-  @AfterClass
-  public void tearDown()
-  {
-    doClassTeardown();
-  }
-
   /**
    * This test must be run individually since the test affect and modify the state of the Druid cluster
    */
   @Test
   public void testKafkaIndexDataWithLosingCoordinator() throws Exception
   {
-    doTestIndexDataWithLosingCoordinator();
+    doTestIndexDataWithLosingCoordinator(true);
   }
 
   /**
@@ -69,7 +56,7 @@ public class ITKafkaIndexingServiceTransactionalSerializedTest extends AbstractK
   @Test
   public void testKafkaIndexDataWithLosingOverlord() throws Exception
   {
-    doTestIndexDataWithLosingOverlord();
+    doTestIndexDataWithLosingOverlord(true);
   }
 
   /**
@@ -78,6 +65,6 @@ public class ITKafkaIndexingServiceTransactionalSerializedTest extends AbstractK
   @Test
   public void testKafkaIndexDataWithLosingHistorical() throws Exception
   {
-    doTestIndexDataWithLosingHistorical();
+    doTestIndexDataWithLosingHistorical(true);
   }
 }
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKinesisIndexingServiceSerializedTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKinesisIndexingServiceSerializedTest.java
index 8e64abb..fed1361 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKinesisIndexingServiceSerializedTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKinesisIndexingServiceSerializedTest.java
@@ -21,7 +21,6 @@ package org.apache.druid.tests.indexer;
 
 import org.apache.druid.testing.guice.DruidTestModuleFactory;
 import org.apache.druid.tests.TestNGGroup;
-import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Guice;
 import org.testng.annotations.Test;
@@ -42,19 +41,13 @@ public class ITKinesisIndexingServiceSerializedTest extends AbstractKinesisIndex
     doBeforeClass();
   }
 
-  @AfterClass
-  public void tearDown()
-  {
-    doClassTeardown();
-  }
-
   /**
    * This test must be run individually since the test affect and modify the state of the Druid cluster
    */
   @Test
   public void testKinesisIndexDataWithLosingCoordinator() throws Exception
   {
-    doTestIndexDataWithLosingCoordinator();
+    doTestIndexDataWithLosingCoordinator(false);
   }
 
   /**
@@ -63,7 +56,7 @@ public class ITKinesisIndexingServiceSerializedTest extends AbstractKinesisIndex
   @Test
   public void testKinesisIndexDataWithLosingOverlord() throws Exception
   {
-    doTestIndexDataWithLosingOverlord();
+    doTestIndexDataWithLosingOverlord(false);
   }
 
   /**
@@ -72,6 +65,6 @@ public class ITKinesisIndexingServiceSerializedTest extends AbstractKinesisIndex
   @Test
   public void testKinesisIndexDataWithLosingHistorical() throws Exception
   {
-    doTestIndexDataWithLosingHistorical();
+    doTestIndexDataWithLosingHistorical(false);
   }
 }
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceDataFormatTest.java b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceDataFormatTest.java
new file mode 100644
index 0000000..9143d9b
--- /dev/null
+++ b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceDataFormatTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.tests.parallelized;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.inject.Inject;
+import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.testing.guice.DruidTestModuleFactory;
+import org.apache.druid.tests.TestNGGroup;
+import org.apache.druid.tests.indexer.AbstractKafkaIndexingServiceTest;
+import org.apache.druid.tests.indexer.AbstractStreamIndexingTest;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+@Test(groups = TestNGGroup.KAFKA_DATA_FORMAT)
+@Guice(moduleFactory = DruidTestModuleFactory.class)
+public class ITKafkaIndexingServiceDataFormatTest extends AbstractKafkaIndexingServiceTest
+{
+  private static final boolean TRANSACTION_DISABLED = false;
+  private static final boolean TRANSACTION_ENABLED = true;
+
+  /**
+   * Generates test parameters based on the given resources. The resources should be structured as
+   *
+   * <pre>{@code
+   * {RESOURCES_ROOT}/stream/data/{DATA_FORMAT}/serializer
+   *                                           /input_format
+   *                                           /parser
+   * }</pre>
+   *
+   * The {@code serializer} directory contains the spec of {@link org.apache.druid.testing.utils.EventSerializer} and
+   * must be present. Either {@code input_format} or {@code parser} directory should be present if {@code serializer}
+   * is present.
+   */
+  @DataProvider(parallel = true)
+  public static Object[][] resources() throws IOException
+  {
+    final List<Object[]> resources = new ArrayList<>();
+    final List<String> dataFormats = listDataFormatResources();
+    for (String eachFormat : dataFormats) {
+      final Map<String, String> spec = findTestSpecs(String.join("/", DATA_RESOURCE_ROOT, eachFormat));
+      final String serializerPath = spec.get(AbstractStreamIndexingTest.SERIALIZER);
+      spec.forEach((k, path) -> {
+        if (!AbstractStreamIndexingTest.SERIALIZER.equals(k)) {
+          resources.add(new Object[]{TRANSACTION_DISABLED, serializerPath, k, path});
+          resources.add(new Object[]{TRANSACTION_ENABLED, serializerPath, k, path});
+        }
+      });
+    }
+
+    return resources.toArray(new Object[0][]);
+  }
+
+  @Inject
+  private @Json ObjectMapper jsonMapper;
+
+  @BeforeClass
+  public void beforeClass() throws Exception
+  {
+    doBeforeClass();
+  }
+
+  @Test(dataProvider = "resources")
+  public void testIndexData(boolean transactionEnabled, String serializerPath, String parserType, String specPath)
+      throws Exception
+  {
+    doTestIndexDataStableState(transactionEnabled, serializerPath, parserType, specPath);
+  }
+
+  @Override
+  public String getTestNamePrefix()
+  {
+    return "kafka_data_format";
+  }
+}
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceNonTransactionalParallelizedTest.java b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceNonTransactionalParallelizedTest.java
index 199530e..2c648ea 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceNonTransactionalParallelizedTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceNonTransactionalParallelizedTest.java
@@ -22,7 +22,6 @@ package org.apache.druid.tests.parallelized;
 import org.apache.druid.testing.guice.DruidTestModuleFactory;
 import org.apache.druid.tests.TestNGGroup;
 import org.apache.druid.tests.indexer.AbstractKafkaIndexingServiceTest;
-import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Guice;
 import org.testng.annotations.Test;
@@ -32,12 +31,6 @@ import org.testng.annotations.Test;
 public class ITKafkaIndexingServiceNonTransactionalParallelizedTest extends AbstractKafkaIndexingServiceTest
 {
   @Override
-  protected boolean isKafkaWriterTransactionalEnabled()
-  {
-    return false;
-  }
-
-  @Override
   public String getTestNamePrefix()
   {
     return "kafka_non_transactional_parallelized";
@@ -49,32 +42,6 @@ public class ITKafkaIndexingServiceNonTransactionalParallelizedTest extends Abst
     doBeforeClass();
   }
 
-  @AfterClass
-  public void tearDown()
-  {
-    doClassTeardown();
-  }
-
-  /**
-   * This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource
-   * and supervisor maintained and scoped within this test only
-   */
-  @Test
-  public void testKafkaIndexDataWithLegacyParserStableState() throws Exception
-  {
-    doTestIndexDataWithLegacyParserStableState();
-  }
-
-  /**
-   * This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource
-   * and supervisor maintained and scoped within this test only
-   */
-  @Test
-  public void testKafkaIndexDataWithInputFormatStableState() throws Exception
-  {
-    doTestIndexDataWithInputFormatStableState();
-  }
-
   /**
    * This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource
    * and supervisor maintained and scoped within this test only
@@ -82,7 +49,7 @@ public class ITKafkaIndexingServiceNonTransactionalParallelizedTest extends Abst
   @Test
   public void testKafkaIndexDataWithStartStopSupervisor() throws Exception
   {
-    doTestIndexDataWithStartStopSupervisor();
+    doTestIndexDataWithStartStopSupervisor(false);
   }
 
   /**
@@ -92,6 +59,6 @@ public class ITKafkaIndexingServiceNonTransactionalParallelizedTest extends Abst
   @Test
   public void testKafkaIndexDataWithKafkaReshardSplit() throws Exception
   {
-    doTestIndexDataWithStreamReshardSplit();
+    doTestIndexDataWithStreamReshardSplit(false);
   }
 }
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceTransactionalParallelizedTest.java b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceTransactionalParallelizedTest.java
index 7db3a7f..d61e977 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceTransactionalParallelizedTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceTransactionalParallelizedTest.java
@@ -22,7 +22,6 @@ package org.apache.druid.tests.parallelized;
 import org.apache.druid.testing.guice.DruidTestModuleFactory;
 import org.apache.druid.tests.TestNGGroup;
 import org.apache.druid.tests.indexer.AbstractKafkaIndexingServiceTest;
-import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Guice;
 import org.testng.annotations.Test;
@@ -32,12 +31,6 @@ import org.testng.annotations.Test;
 public class ITKafkaIndexingServiceTransactionalParallelizedTest extends AbstractKafkaIndexingServiceTest
 {
   @Override
-  protected boolean isKafkaWriterTransactionalEnabled()
-  {
-    return true;
-  }
-
-  @Override
   public String getTestNamePrefix()
   {
     return "kafka_transactional_parallelized";
@@ -49,32 +42,6 @@ public class ITKafkaIndexingServiceTransactionalParallelizedTest extends Abstrac
     doBeforeClass();
   }
 
-  @AfterClass
-  public void tearDown()
-  {
-    doClassTeardown();
-  }
-
-  /**
-   * This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource
-   * and supervisor maintained and scoped within this test only
-   */
-  @Test
-  public void testKafkaIndexDataWithLegacyParserStableState() throws Exception
-  {
-    doTestIndexDataWithLegacyParserStableState();
-  }
-
-  /**
-   * This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource
-   * and supervisor maintained and scoped within this test only
-   */
-  @Test
-  public void testKafkaIndexDataWithInputFormatStableState() throws Exception
-  {
-    doTestIndexDataWithInputFormatStableState();
-  }
-
   /**
    * This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource
    * and supervisor maintained and scoped within this test only
@@ -82,7 +49,7 @@ public class ITKafkaIndexingServiceTransactionalParallelizedTest extends Abstrac
   @Test
   public void testKafkaIndexDataWithStartStopSupervisor() throws Exception
   {
-    doTestIndexDataWithStartStopSupervisor();
+    doTestIndexDataWithStartStopSupervisor(true);
   }
 
   /**
@@ -92,6 +59,6 @@ public class ITKafkaIndexingServiceTransactionalParallelizedTest extends Abstrac
   @Test
   public void testKafkaIndexDataWithKafkaReshardSplit() throws Exception
   {
-    doTestIndexDataWithStreamReshardSplit();
+    doTestIndexDataWithStreamReshardSplit(true);
   }
 }
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKinesisIndexingServiceDataFormatTest.java b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKinesisIndexingServiceDataFormatTest.java
new file mode 100644
index 0000000..c302cd1
--- /dev/null
+++ b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKinesisIndexingServiceDataFormatTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.tests.parallelized;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.inject.Inject;
+import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.testing.guice.DruidTestModuleFactory;
+import org.apache.druid.tests.TestNGGroup;
+import org.apache.druid.tests.indexer.AbstractKinesisIndexingServiceTest;
+import org.apache.druid.tests.indexer.AbstractStreamIndexingTest;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+@Test(groups = TestNGGroup.KINESIS_DATA_FORMAT)
+@Guice(moduleFactory = DruidTestModuleFactory.class)
+public class ITKinesisIndexingServiceDataFormatTest extends AbstractKinesisIndexingServiceTest
+{
+  /**
+   * Generates test parameters based on the given resources. The resources should be structured as
+   *
+   * <pre>{@code
+   * {RESOURCES_ROOT}/stream/data/{DATA_FORMAT}/serializer
+   *                                           /input_format
+   *                                           /parser
+   * }</pre>
+   *
+   * The {@code serializer} directory contains the spec of {@link org.apache.druid.testing.utils.EventSerializer} and
+   * must be present. Either {@code input_format} or {@code parser} directory should be present if {@code serializer}
+   * is present.
+   */
+  @DataProvider(parallel = true)
+  public static Object[][] resources() throws IOException
+  {
+    final List<Object[]> resources = new ArrayList<>();
+    final List<String> dataFormats = listDataFormatResources();
+    for (String eachFormat : dataFormats) {
+      final Map<String, String> spec = findTestSpecs(String.join("/", DATA_RESOURCE_ROOT, eachFormat));
+      final String serializerPath = spec.get(AbstractStreamIndexingTest.SERIALIZER);
+      spec.forEach((k, path) -> {
+        if (!AbstractStreamIndexingTest.SERIALIZER.equals(k)) {
+          resources.add(new Object[]{serializerPath, k, path});
+        }
+      });
+    }
+
+    return resources.toArray(new Object[0][]);
+  }
+
+  @Inject
+  private @Json
+  ObjectMapper jsonMapper;
+
+  @BeforeClass
+  public void beforeClass() throws Exception
+  {
+    doBeforeClass();
+  }
+
+  @Test(dataProvider = "resources")
+  public void testIndexData(String serializerPath, String parserType, String specPath)
+      throws Exception
+  {
+    doTestIndexDataStableState(false, serializerPath, parserType, specPath);
+  }
+
+  @Override
+  public String getTestNamePrefix()
+  {
+    return "kinesis_data_format";
+  }
+}
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKinesisIndexingServiceParallelizedTest.java b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKinesisIndexingServiceParallelizedTest.java
index 38816dc..efd107f 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKinesisIndexingServiceParallelizedTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKinesisIndexingServiceParallelizedTest.java
@@ -22,7 +22,6 @@ package org.apache.druid.tests.parallelized;
 import org.apache.druid.testing.guice.DruidTestModuleFactory;
 import org.apache.druid.tests.TestNGGroup;
 import org.apache.druid.tests.indexer.AbstractKinesisIndexingServiceTest;
-import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Guice;
 import org.testng.annotations.Test;
@@ -43,32 +42,6 @@ public class ITKinesisIndexingServiceParallelizedTest extends AbstractKinesisInd
     doBeforeClass();
   }
 
-  @AfterClass
-  public void tearDown()
-  {
-    doClassTeardown();
-  }
-
-  /**
-   * This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource
-   * and supervisor maintained and scoped within this test only
-   */
-  @Test
-  public void testKinesisIndexDataWithLegacyParserStableState() throws Exception
-  {
-    doTestIndexDataWithLegacyParserStableState();
-  }
-
-  /**
-   * This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource
-   * and supervisor maintained and scoped within this test only
-   */
-  @Test
-  public void testKinesisIndexDataWithInputFormatStableState() throws Exception
-  {
-    doTestIndexDataWithInputFormatStableState();
-  }
-
   /**
    * This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource
    * and supervisor maintained and scoped within this test only
@@ -76,7 +49,7 @@ public class ITKinesisIndexingServiceParallelizedTest extends AbstractKinesisInd
   @Test
   public void testKinesisIndexDataWithStartStopSupervisor() throws Exception
   {
-    doTestIndexDataWithStartStopSupervisor();
+    doTestIndexDataWithStartStopSupervisor(false);
   }
 
   /**
@@ -86,7 +59,7 @@ public class ITKinesisIndexingServiceParallelizedTest extends AbstractKinesisInd
   @Test
   public void testKinesisIndexDataWithKinesisReshardSplit() throws Exception
   {
-    doTestIndexDataWithStreamReshardSplit();
+    doTestIndexDataWithStreamReshardSplit(false);
   }
 
   /**
@@ -96,6 +69,6 @@ public class ITKinesisIndexingServiceParallelizedTest extends AbstractKinesisInd
   @Test
   public void testKinesisIndexDataWithKinesisReshardMerge() throws Exception
   {
-    doTestIndexDataWithStreamReshardMerge();
+    doTestIndexDataWithStreamReshardMerge(false);
   }
 }
diff --git a/integration-tests/src/test/resources/indexer/stream_supervisor_spec_legacy_parser.json b/integration-tests/src/test/resources/indexer/stream_supervisor_spec_legacy_parser.json
deleted file mode 100644
index 623aadf..0000000
--- a/integration-tests/src/test/resources/indexer/stream_supervisor_spec_legacy_parser.json
+++ /dev/null
@@ -1,61 +0,0 @@
-{
-  "type": "%%STREAM_TYPE%%",
-  "dataSchema": {
-    "dataSource": "%%DATASOURCE%%",
-    "parser": {
-      "type": "string",
-      "parseSpec": {
-        "format": "json",
-        "timestampSpec": {
-          "column": "timestamp",
-          "format": "auto"
-        },
-        "dimensionsSpec": {
-          "dimensions": ["page", "language", "user", "unpatrolled", "newPage", "robot", "anonymous", "namespace", "continent", "country", "region", "city"],
-          "dimensionExclusions": [],
-          "spatialDimensions": []
-        }
-      }
-    },
-    "metricsSpec": [
-      {
-        "type": "count",
-        "name": "count"
-      },
-      {
-        "type": "doubleSum",
-        "name": "added",
-        "fieldName": "added"
-      },
-      {
-        "type": "doubleSum",
-        "name": "deleted",
-        "fieldName": "deleted"
-      },
-      {
-        "type": "doubleSum",
-        "name": "delta",
-        "fieldName": "delta"
-      }
-    ],
-    "granularitySpec": {
-      "type": "uniform",
-      "segmentGranularity": "MINUTE",
-      "queryGranularity": "NONE"
-    }
-  },
-  "tuningConfig": {
-    "type": "%%STREAM_TYPE%%",
-    "intermediatePersistPeriod": "PT30S",
-    "maxRowsPerSegment": 5000000,
-    "maxRowsInMemory": 500000
-  },
-  "ioConfig": {
-    "%%TOPIC_KEY%%": "%%TOPIC_VALUE%%",
-    "%%STREAM_PROPERTIES_KEY%%": %%STREAM_PROPERTIES_VALUE%%,
-    "taskCount": 2,
-    "replicas": 1,
-    "taskDuration": "PT5M",
-    "%%USE_EARLIEST_KEY%%": true
-  }
-}
diff --git a/integration-tests/src/test/resources/stream/data/avro/parser/input_row_parser.json b/integration-tests/src/test/resources/stream/data/avro/parser/input_row_parser.json
new file mode 100644
index 0000000..4feea54
--- /dev/null
+++ b/integration-tests/src/test/resources/stream/data/avro/parser/input_row_parser.json
@@ -0,0 +1,39 @@
+{
+  "type": "avro_stream",
+  "avroBytesDecoder" : {
+    "type": "schema_inline",
+    "schema": {
+      "namespace": "org.apache.druid",
+      "name": "wikipedia",
+      "type": "record",
+      "fields": [
+        { "name": "timestamp", "type": "string" },
+        { "name": "page", "type": "string" },
+        { "name": "language", "type": "string" },
+        { "name": "user", "type": "string" },
+        { "name": "unpatrolled", "type": "string" },
+        { "name": "newPage", "type": "string" },
+        { "name": "robot", "type": "string" },
+        { "name": "anonymous", "type": "string" },
+        { "name": "namespace", "type": "string" },
+        { "name": "continent", "type": "string" },
+        { "name": "country", "type": "string" },
+        { "name": "region", "type": "string" },
+        { "name": "city", "type": "string" },
+        { "name": "added", "type": "long" },
+        { "name": "deleted", "type": "long" },
+        { "name": "delta", "type": "long" }
+      ]
+    }
+  },
+  "parseSpec": {
+    "format": "avro",
+    "timestampSpec": {
+      "column": "timestamp",
+      "format": "auto"
+    },
+    "dimensionsSpec": {
+      "dimensions": ["page", "language", "user", "unpatrolled", "newPage", "robot", "anonymous", "namespace", "continent", "country", "region", "city"]
+    }
+  }
+}
\ No newline at end of file
diff --git a/integration-tests/src/test/resources/stream/data/avro/serializer/serializer.json b/integration-tests/src/test/resources/stream/data/avro/serializer/serializer.json
new file mode 100644
index 0000000..20b3462
--- /dev/null
+++ b/integration-tests/src/test/resources/stream/data/avro/serializer/serializer.json
@@ -0,0 +1,3 @@
+{
+  "type": "avro"
+}
\ No newline at end of file
diff --git a/integration-tests/src/test/resources/stream/data/csv/input_format/input_format.json b/integration-tests/src/test/resources/stream/data/csv/input_format/input_format.json
new file mode 100644
index 0000000..9143069
--- /dev/null
+++ b/integration-tests/src/test/resources/stream/data/csv/input_format/input_format.json
@@ -0,0 +1,4 @@
+{
+  "type" : "csv",
+  "columns": ["timestamp","page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city","added","deleted","delta"]
+}
\ No newline at end of file
diff --git a/integration-tests/src/test/resources/stream/data/csv/parser/input_row_parser.json b/integration-tests/src/test/resources/stream/data/csv/parser/input_row_parser.json
new file mode 100644
index 0000000..11fa718
--- /dev/null
+++ b/integration-tests/src/test/resources/stream/data/csv/parser/input_row_parser.json
@@ -0,0 +1,16 @@
+{
+  "type": "string",
+  "parseSpec": {
+    "format": "csv",
+    "timestampSpec": {
+      "column": "timestamp",
+      "format": "auto"
+    },
+    "columns" : ["timestamp","page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city","added","deleted","delta"],
+    "dimensionsSpec": {
+      "dimensions": ["page", "language", "user", "unpatrolled", "newPage", "robot", "anonymous", "namespace", "continent", "country", "region", "city"],
+      "dimensionExclusions": [],
+      "spatialDimensions": []
+    }
+  }
+}
\ No newline at end of file
diff --git a/integration-tests/src/test/resources/stream/data/csv/serializer/serializer.json b/integration-tests/src/test/resources/stream/data/csv/serializer/serializer.json
new file mode 100644
index 0000000..b7dd0d8
--- /dev/null
+++ b/integration-tests/src/test/resources/stream/data/csv/serializer/serializer.json
@@ -0,0 +1,3 @@
+{
+  "type": "csv"
+}
\ No newline at end of file
diff --git a/integration-tests/src/test/resources/stream/data/json/input_format/input_format.json b/integration-tests/src/test/resources/stream/data/json/input_format/input_format.json
new file mode 100644
index 0000000..cfb17e2
--- /dev/null
+++ b/integration-tests/src/test/resources/stream/data/json/input_format/input_format.json
@@ -0,0 +1,3 @@
+{
+  "type" : "json"
+}
\ No newline at end of file
diff --git a/integration-tests/src/test/resources/stream/data/json/parser/input_row_parser.json b/integration-tests/src/test/resources/stream/data/json/parser/input_row_parser.json
new file mode 100644
index 0000000..b5fa892
--- /dev/null
+++ b/integration-tests/src/test/resources/stream/data/json/parser/input_row_parser.json
@@ -0,0 +1,15 @@
+{
+  "type": "string",
+  "parseSpec": {
+    "format": "json",
+    "timestampSpec": {
+      "column": "timestamp",
+      "format": "auto"
+    },
+    "dimensionsSpec": {
+      "dimensions": ["page", "language", "user", "unpatrolled", "newPage", "robot", "anonymous", "namespace", "continent", "country", "region", "city"],
+      "dimensionExclusions": [],
+      "spatialDimensions": []
+    }
+  }
+}
\ No newline at end of file
diff --git a/integration-tests/src/test/resources/stream/data/json/serializer/serializer.json b/integration-tests/src/test/resources/stream/data/json/serializer/serializer.json
new file mode 100644
index 0000000..126d77b
--- /dev/null
+++ b/integration-tests/src/test/resources/stream/data/json/serializer/serializer.json
@@ -0,0 +1,3 @@
+{
+  "type": "json"
+}
\ No newline at end of file
diff --git a/integration-tests/src/test/resources/indexer/stream_supervisor_spec_input_format.json b/integration-tests/src/test/resources/stream/data/supervisor_spec_template.json
similarity index 95%
rename from integration-tests/src/test/resources/indexer/stream_supervisor_spec_input_format.json
rename to integration-tests/src/test/resources/stream/data/supervisor_spec_template.json
index ce9bedc..9943431 100644
--- a/integration-tests/src/test/resources/indexer/stream_supervisor_spec_input_format.json
+++ b/integration-tests/src/test/resources/stream/data/supervisor_spec_template.json
@@ -2,6 +2,7 @@
   "type": "%%STREAM_TYPE%%",
   "dataSchema": {
     "dataSource": "%%DATASOURCE%%",
+    "parser": %%PARSER%%,
     "timestampSpec": {
       "column": "timestamp",
       "format": "auto"
@@ -51,8 +52,6 @@
     "replicas": 1,
     "taskDuration": "PT5M",
     "%%USE_EARLIEST_KEY%%": true,
-    "inputFormat" : {
-      "type" : "json"
-    }
+    "inputFormat" : %%INPUT_FORMAT%%
   }
 }
diff --git a/integration-tests/src/test/resources/stream/data/tsv/input_format/input_format.json b/integration-tests/src/test/resources/stream/data/tsv/input_format/input_format.json
new file mode 100644
index 0000000..ece2968
--- /dev/null
+++ b/integration-tests/src/test/resources/stream/data/tsv/input_format/input_format.json
@@ -0,0 +1,4 @@
+{
+  "type" : "tsv",
+  "columns": ["timestamp","page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city","added","deleted","delta"]
+}
\ No newline at end of file
diff --git a/integration-tests/src/test/resources/stream/data/tsv/parser/input_row_parser.json b/integration-tests/src/test/resources/stream/data/tsv/parser/input_row_parser.json
new file mode 100644
index 0000000..c60a0c1
--- /dev/null
+++ b/integration-tests/src/test/resources/stream/data/tsv/parser/input_row_parser.json
@@ -0,0 +1,16 @@
+{
+  "type": "string",
+  "parseSpec": {
+    "format": "tsv",
+    "timestampSpec": {
+      "column": "timestamp",
+      "format": "auto"
+    },
+    "columns" : ["timestamp","page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city","added","deleted","delta"],
+    "dimensionsSpec": {
+      "dimensions": ["page", "language", "user", "unpatrolled", "newPage", "robot", "anonymous", "namespace", "continent", "country", "region", "city"],
+      "dimensionExclusions": [],
+      "spatialDimensions": []
+    }
+  }
+}
\ No newline at end of file
diff --git a/integration-tests/src/test/resources/stream/data/tsv/serializer/serializer.json b/integration-tests/src/test/resources/stream/data/tsv/serializer/serializer.json
new file mode 100644
index 0000000..731ad41
--- /dev/null
+++ b/integration-tests/src/test/resources/stream/data/tsv/serializer/serializer.json
@@ -0,0 +1,3 @@
+{
+  "type": "tsv"
+}
\ No newline at end of file
diff --git a/integration-tests/src/test/resources/indexer/stream_index_queries.json b/integration-tests/src/test/resources/stream/queries/stream_index_queries.json
similarity index 100%
rename from integration-tests/src/test/resources/indexer/stream_index_queries.json
rename to integration-tests/src/test/resources/stream/queries/stream_index_queries.json
diff --git a/integration-tests/src/test/resources/testng.xml b/integration-tests/src/test/resources/testng.xml
index 5a0735a..88c6415 100644
--- a/integration-tests/src/test/resources/testng.xml
+++ b/integration-tests/src/test/resources/testng.xml
@@ -20,7 +20,7 @@
 <!DOCTYPE suite SYSTEM "http://testng.org/testng-1.0.dtd" >
 
 
-<suite name="IntegrationTestSuite">
+<suite name="IntegrationTestSuite" data-provider-thread-count="2">
   <listeners>
     <listener class-name="org.apache.druid.testing.utils.LoggerListener" />
     <listener class-name="org.apache.druid.testing.utils.SuiteListener" />


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org