You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2019/05/28 17:23:44 UTC

[incubator-pinot] branch master updated: Remove depependency on Kafka stream implementation classes from pinot-core and pinot-controller tests (#4233)

This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new fcfcd88  Remove depependency on Kafka stream implementation classes from pinot-core and pinot-controller tests (#4233)
fcfcd88 is described below

commit fcfcd88266c163270169a1ffcfca3d61869c8e59
Author: Neha Pawar <np...@linkedin.com>
AuthorDate: Tue May 28 10:23:37 2019 -0700

    Remove depependency on Kafka stream implementation classes from pinot-core and pinot-controller tests (#4233)
    
    * Remove dependency on Kafka stream implementation classes from pinot-core and pinot-controller tests
    
    * Add FakeStream to be used in tests across the board
---
 .../resources/PinotTableRestletResourceTest.java   |  29 +-
 .../controller/api/resources/TableViewsTest.java   |  26 +-
 .../PinotLLCRealtimeSegmentManagerTest.java        |  51 +-
 .../segment/FlushThresholdUpdaterTest.java         |  23 +-
 .../rebalance/DefaultRebalanceStrategyTest.java    |  31 +-
 .../sharding/SegmentAssignmentStrategyTest.java    |  15 +-
 .../realtime/LLRealtimeSegmentDataManagerTest.java |  52 +-
 .../fakestream/FakePartitionLevelConsumer.java     | 121 +++++
 .../impl/fakestream/FakeStreamConfigUtils.java     | 168 +++++++
 .../impl/fakestream/FakeStreamConsumerFactory.java | 101 ++++
 .../impl/fakestream/FakeStreamLevelConsumer.java   |  49 ++
 .../impl/fakestream/FakeStreamMessageBatch.java    |  56 +++
 .../impl/fakestream/FakeStreamMessageDecoder.java  |  71 +++
 .../fakestream/FakeStreamMetadataProvider.java     |  59 +++
 .../core/realtime/stream/StreamConfigTest.java     | 277 ++++++-----
 .../data/fakestream/fake_stream_avro_data.tar.gz   | Bin 0 -> 809609 bytes
 .../data/fakestream/fake_stream_avro_schema.avsc   | 524 +++++++++++++++++++++
 .../data/fakestream/fake_stream_pinot_schema.json  | 335 +++++++++++++
 18 files changed, 1657 insertions(+), 331 deletions(-)

diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotTableRestletResourceTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotTableRestletResourceTest.java
index 5dbe230..fe00c6a 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotTableRestletResourceTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotTableRestletResourceTest.java
@@ -21,8 +21,6 @@ package org.apache.pinot.controller.api.resources;
 import com.fasterxml.jackson.databind.JsonNode;
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
 import org.apache.pinot.common.config.QuotaConfig;
 import org.apache.pinot.common.config.TableConfig;
 import org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
@@ -31,9 +29,8 @@ import org.apache.pinot.common.utils.ZkStarter;
 import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.helix.ControllerRequestBuilderUtil;
 import org.apache.pinot.controller.helix.ControllerTest;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaStreamConfigProperties;
+import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
 import org.apache.pinot.core.realtime.stream.StreamConfig;
-import org.apache.pinot.core.realtime.stream.StreamConfigProperties;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -75,30 +72,10 @@ public class PinotTableRestletResourceTest extends ControllerTest {
 
     // add schema for realtime table
     addDummySchema(REALTIME_TABLE_NAME);
-
-    Map<String, String> streamConfigs = new HashMap<>();
-    String streamType = "kafka";
-    streamConfigs.put(StreamConfigProperties.STREAM_TYPE, streamType);
-    streamConfigs
-        .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
-            StreamConfig.ConsumerType.HIGHLEVEL.toString());
-    streamConfigs
-        .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_TOPIC_NAME),
-            "fakeTopic");
-    streamConfigs
-        .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_DECODER_CLASS),
-            "fakeClass");
-    streamConfigs.put(StreamConfigProperties
-        .constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS), "fakeClass");
-    streamConfigs.put(KafkaStreamConfigProperties
-            .constructStreamProperty(KafkaStreamConfigProperties.HighLevelConsumer.KAFKA_HLC_ZK_CONNECTION_STRING),
-        "fakeUrl");
-    streamConfigs.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS, Integer.toString(1234));
-    streamConfigs.put(StreamConfigProperties
-        .constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_OFFSET_CRITERIA), "smallest");
+    StreamConfig streamConfig = FakeStreamConfigUtils.getDefaultHighLevelStreamConfigs();
     _realtimeBuilder.setTableName(REALTIME_TABLE_NAME).setTimeColumnName("timeColumn").setTimeType("DAYS")
         .setRetentionTimeUnit("DAYS").setRetentionTimeValue("5").setSchemaName(REALTIME_TABLE_NAME)
-        .setStreamConfigs(streamConfigs);
+        .setStreamConfigs(streamConfig.getStreamConfigsMap());
   }
 
   @Test
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/TableViewsTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/TableViewsTest.java
index 4ac9418..ee14650 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/TableViewsTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/TableViewsTest.java
@@ -20,7 +20,6 @@ package org.apache.pinot.controller.api.resources;
 
 import java.net.HttpURLConnection;
 import java.net.URL;
-import java.util.HashMap;
 import java.util.Map;
 import org.apache.helix.InstanceType;
 import org.apache.pinot.common.config.TableConfig;
@@ -31,10 +30,8 @@ import org.apache.pinot.common.utils.ZkStarter;
 import org.apache.pinot.controller.helix.ControllerRequestBuilderUtil;
 import org.apache.pinot.controller.helix.ControllerTest;
 import org.apache.pinot.controller.utils.SegmentMetadataMockUtils;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaAvroMessageDecoder;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaConsumerFactory;
+import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
 import org.apache.pinot.core.realtime.stream.StreamConfig;
-import org.apache.pinot.core.realtime.stream.StreamConfigProperties;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -79,26 +76,9 @@ public class TableViewsTest extends ControllerTest {
 
     // add schema for realtime table
     addDummySchema(HYBRID_TABLE_NAME);
-    Map<String, String> streamConfigs = new HashMap<>();
-    String streamType = "kafka";
-    String topic = "aTopic";
-    String consumerFactoryClass = KafkaConsumerFactory.class.getName();
-    String decoderClass = KafkaAvroMessageDecoder.class.getName();
-    streamConfigs.put(StreamConfigProperties.STREAM_TYPE, streamType);
-    streamConfigs
-        .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_TOPIC_NAME),
-            topic);
-    streamConfigs
-        .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
-            StreamConfig.ConsumerType.HIGHLEVEL.toString());
-    streamConfigs.put(StreamConfigProperties
-            .constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS),
-        consumerFactoryClass);
-    streamConfigs
-        .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_DECODER_CLASS),
-            decoderClass);
+    StreamConfig streamConfig = FakeStreamConfigUtils.getDefaultHighLevelStreamConfigs();
     tableConfig = new TableConfig.Builder(CommonConstants.Helix.TableType.REALTIME).setTableName(HYBRID_TABLE_NAME)
-        .setNumReplicas(2).setStreamConfigs(streamConfigs).build();
+        .setNumReplicas(2).setStreamConfigs(streamConfig.getStreamConfigsMap()).build();
     _helixResourceManager.addTable(tableConfig);
 
     // Wait for external view get updated
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index 646d3bc..04e2b7b 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -63,9 +63,7 @@ import org.apache.pinot.controller.helix.core.PinotTableIdealStateBuilder;
 import org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
 import org.apache.pinot.controller.util.SegmentCompletionUtils;
 import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaAvroMessageDecoder;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaConsumerFactory;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaStreamConfigProperties;
+import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
 import org.apache.pinot.core.realtime.stream.OffsetCriteria;
 import org.apache.pinot.core.realtime.stream.StreamConfig;
 import org.apache.pinot.core.realtime.stream.StreamConfigProperties;
@@ -1268,32 +1266,10 @@ public class PinotLLCRealtimeSegmentManagerTest {
     when(mockValidationConfig.getReplicasPerPartition()).thenReturn(Integer.toString(nReplicas));
     when(mockValidationConfig.getReplicasPerPartitionNumber()).thenReturn(nReplicas);
     when(mockTableConfig.getValidationConfig()).thenReturn(mockValidationConfig);
-    Map<String, String> streamConfigMap = new HashMap<>(1);
-    String streamType = "kafka";
-    streamConfigMap.put(StreamConfigProperties.STREAM_TYPE, streamType);
-    String topic = "aTopic";
-    String consumerFactoryClass = KafkaConsumerFactory.class.getName();
-    String decoderClass = KafkaAvroMessageDecoder.class.getName();
-    streamConfigMap
-        .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_TOPIC_NAME),
-            topic);
-    streamConfigMap.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS, "100000");
-    streamConfigMap
-        .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
-            "simple");
-    streamConfigMap.put(StreamConfigProperties
-            .constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS),
-        consumerFactoryClass);
-    streamConfigMap
-        .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_DECODER_CLASS),
-            decoderClass);
-
-    final String bootstrapHostConfigKey = KafkaStreamConfigProperties
-        .constructStreamProperty(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_BROKER_LIST);
-    streamConfigMap.put(bootstrapHostConfigKey, bootstrapHosts);
 
+    StreamConfig streamConfig = FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs();
     IndexingConfig mockIndexConfig = mock(IndexingConfig.class);
-    when(mockIndexConfig.getStreamConfigs()).thenReturn(streamConfigMap);
+    when(mockIndexConfig.getStreamConfigs()).thenReturn(streamConfig.getStreamConfigsMap());
 
     when(mockTableConfig.getIndexingConfig()).thenReturn(mockIndexConfig);
     TenantConfig mockTenantConfig = mock(TenantConfig.class);
@@ -1303,24 +1279,6 @@ public class PinotLLCRealtimeSegmentManagerTest {
     return mockTableConfig;
   }
 
-  private static Map<String, String> getStreamConfigs() {
-    Map<String, String> streamPropMap = new HashMap<>(1);
-    String streamType = "kafka";
-    streamPropMap.put(StreamConfigProperties.STREAM_TYPE, streamType);
-    String topic = "aTopic";
-    streamPropMap
-        .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_TOPIC_NAME),
-            topic);
-    streamPropMap
-        .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
-            "simple");
-    streamPropMap.put(StreamConfigProperties
-        .constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_OFFSET_CRITERIA), "smallest");
-    streamPropMap.put(KafkaStreamConfigProperties
-        .constructStreamProperty(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_BROKER_LIST), "host:1234");
-    return streamPropMap;
-  }
-
   //////////////////////////////////////////////////////////////////////////////////
   // Fake classes
   /////////////////////////////////////////////////////////////////////////////////
@@ -1372,7 +1330,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
         TableConfig mockTableConfig = mock(TableConfig.class);
         IndexingConfig mockIndexingConfig = mock(IndexingConfig.class);
         when(mockTableConfig.getIndexingConfig()).thenReturn(mockIndexingConfig);
-        when(mockIndexingConfig.getStreamConfigs()).thenReturn(getStreamConfigs());
+        StreamConfig streamConfig = FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs();
+        when(mockIndexingConfig.getStreamConfigs()).thenReturn(streamConfig.getStreamConfigsMap());
         when(mockCache.getTableConfig(anyString())).thenReturn(mockTableConfig);
 
         Field tableConfigCacheField = PinotLLCRealtimeSegmentManager.class.getDeclaredField("_tableConfigCache");
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java
index e6037aa..aefaf3e 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java
@@ -28,8 +28,7 @@ import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
 import org.apache.pinot.common.partition.PartitionAssignment;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.LLCSegmentName;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaAvroMessageDecoder;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaConsumerFactory;
+import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
 import org.apache.pinot.core.realtime.stream.StreamConfig;
 import org.apache.pinot.core.realtime.stream.StreamConfigProperties;
 import org.testng.Assert;
@@ -271,24 +270,7 @@ public class FlushThresholdUpdaterTest {
     TableConfig realtimeTableConfig;
 
     FlushThresholdUpdater flushThresholdUpdater;
-    Map<String, String> streamConfigs = new HashMap<>();
-    String streamType = "kafka";
-    String streamTopic = "aTopic";
-    String consumerFactoryClass = KafkaConsumerFactory.class.getName();
-    String decoderClass = KafkaAvroMessageDecoder.class.getName();
-    streamConfigs.put(StreamConfigProperties.STREAM_TYPE, streamType);
-    streamConfigs
-        .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_TOPIC_NAME),
-            streamTopic);
-    streamConfigs
-        .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
-            StreamConfig.ConsumerType.LOWLEVEL.toString());
-    streamConfigs.put(StreamConfigProperties
-            .constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS),
-        consumerFactoryClass);
-    streamConfigs
-        .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_DECODER_CLASS),
-            decoderClass);
+    Map<String, String> streamConfigs = FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap();
     tableConfigBuilder.setStreamConfigs(streamConfigs);
 
     // flush size set
@@ -500,7 +482,6 @@ public class FlushThresholdUpdaterTest {
     String tableName = "fakeTable_REALTIME";
     int seqNum = 0;
     long startOffset = 0;
-    long committingSegmentSizeBytes;
     CommittingSegmentDescriptor committingSegmentDescriptor;
     long now = System.currentTimeMillis();
     long seg0time = now - 1334_650;
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalanceStrategyTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalanceStrategyTest.java
index 2dc07c0..0f6b5f1 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalanceStrategyTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalanceStrategyTest.java
@@ -41,8 +41,8 @@ import org.apache.pinot.common.partition.PartitionAssignment;
 import org.apache.pinot.common.partition.StreamPartitionAssignmentGenerator;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.controller.helix.core.PinotHelixSegmentOnlineOfflineStateModelGenerator;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaAvroMessageDecoder;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaConsumerFactory;
+import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
+import org.apache.pinot.core.realtime.stream.StreamConfig;
 import org.apache.pinot.core.realtime.stream.StreamConfigProperties;
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
@@ -581,26 +581,15 @@ public class DefaultRebalanceStrategyTest {
     CommonConstants.Helix.TableType tableTypeFromTableName = TableNameBuilder.getTableTypeFromTableName(tableName);
     when(mockTableConfig.getTableType()).thenReturn(tableTypeFromTableName);
 
-    Map<String, String> streamConfigMap = new HashMap<>(1);
-    String streamType = "kafka";
-    String topic = "aTopic";
-    String consumerFactoryClass = KafkaConsumerFactory.class.getName();
-    String decoderClass = KafkaAvroMessageDecoder.class.getName();
-    streamConfigMap.put(StreamConfigProperties.STREAM_TYPE, streamType);
-    streamConfigMap
-        .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_TOPIC_NAME),
-            topic);
-    streamConfigMap
-        .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
-            consumerTypesCSV);
-    streamConfigMap.put(StreamConfigProperties
-            .constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS),
-        consumerFactoryClass);
-    streamConfigMap
-        .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_DECODER_CLASS),
-            decoderClass);
+    StreamConfig streamConfig;
+    if (StreamConfig.ConsumerType.HIGHLEVEL.toString().equalsIgnoreCase(consumerTypesCSV)) {
+      streamConfig = FakeStreamConfigUtils.getDefaultHighLevelStreamConfigs();
+    } else {
+      streamConfig = FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs();
+    }
+
     IndexingConfig mockIndexConfig = mock(IndexingConfig.class);
-    when(mockIndexConfig.getStreamConfigs()).thenReturn(streamConfigMap);
+    when(mockIndexConfig.getStreamConfigs()).thenReturn(streamConfig.getStreamConfigsMap());
     when(mockTableConfig.getIndexingConfig()).thenReturn(mockIndexConfig);
 
     return mockTableConfig;
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/sharding/SegmentAssignmentStrategyTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/sharding/SegmentAssignmentStrategyTest.java
index 009116f..3c96a20 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/sharding/SegmentAssignmentStrategyTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/sharding/SegmentAssignmentStrategyTest.java
@@ -40,8 +40,7 @@ import org.apache.pinot.controller.helix.ControllerRequestBuilderUtil;
 import org.apache.pinot.controller.helix.ControllerTest;
 import org.apache.pinot.controller.utils.ReplicaGroupTestUtils;
 import org.apache.pinot.controller.utils.SegmentMetadataMockUtils;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaAvroMessageDecoder;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaStreamConfigProperties;
+import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
 import org.apache.pinot.core.realtime.stream.StreamConfig;
 import org.apache.pinot.core.realtime.stream.StreamConfigProperties;
 import org.testng.Assert;
@@ -253,17 +252,7 @@ public class SegmentAssignmentStrategyTest extends ControllerTest {
     Schema schema = new Schema.SchemaBuilder().setSchemaName(rawTableName).build();
     _helixResourceManager.addOrUpdateSchema(schema);
 
-    Map<String, String> streamConfigMap = new HashMap<>();
-    String type = "kafka";
-    streamConfigMap.put(StreamConfigProperties.STREAM_TYPE, type);
-    streamConfigMap.put(StreamConfigProperties.constructStreamProperty(type, StreamConfigProperties.STREAM_TOPIC_NAME),
-        "test");
-    streamConfigMap.put(
-        StreamConfigProperties.constructStreamProperty(type, StreamConfigProperties.STREAM_CONSUMER_TYPES),
-        StreamConfig.ConsumerType.LOWLEVEL.toString());
-    streamConfigMap.put(
-        StreamConfigProperties.constructStreamProperty(type, StreamConfigProperties.STREAM_DECODER_CLASS),
-        KafkaAvroMessageDecoder.class.getName());
+    Map<String, String> streamConfigMap = FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap();
 
     // Adding a table without replica group
     TableConfig tableConfig = new TableConfig.Builder(CommonConstants.Helix.TableType.REALTIME).setTableName(
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
index ee2ce4a..3d7bfce 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
@@ -25,9 +25,7 @@ import java.lang.reflect.Field;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.LinkedList;
-import java.util.Map;
 import org.apache.commons.io.FileUtils;
-import org.apache.kafka.common.protocol.Errors;
 import org.apache.pinot.common.config.TableConfig;
 import org.apache.pinot.common.data.Schema;
 import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
@@ -36,13 +34,13 @@ import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
 import org.apache.pinot.common.utils.LLCSegmentName;
-import org.apache.pinot.core.data.GenericRow;
 import org.apache.pinot.core.data.manager.config.InstanceDataManagerConfig;
 import org.apache.pinot.core.indexsegment.mutable.MutableSegmentImpl;
 import org.apache.pinot.core.realtime.impl.RealtimeSegmentStatsHistory;
+import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConsumerFactory;
+import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamMessageDecoder;
 import org.apache.pinot.core.realtime.stream.PermanentConsumerException;
 import org.apache.pinot.core.realtime.stream.StreamConfigProperties;
-import org.apache.pinot.core.realtime.stream.StreamMessageDecoder;
 import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -50,8 +48,7 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
 
 
 // TODO Write more tests for other parts of the class
@@ -85,17 +82,17 @@ public class LLRealtimeSegmentDataManagerTest {
           + "    \"segmentFormatVersion\": null, \n" + "    \"sortedColumn\": [], \n" + "    \"streamConfigs\": {\n"
           + "      \"" + StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS + "\": \"" + String
           .valueOf(maxRowsInSegment) + "\", \n" + "      \"" + StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_TIME
-          + "\": \"" + maxTimeForSegmentCloseMs + "\", \n" + "      \"stream.kafka.broker.list\": \"broker:7777\", \n"
-          + "      \"stream.kafka.consumer.prop.auto.offset.reset\": \"smallest\", \n"
-          + "      \"stream.kafka.consumer.type\": \"simple\", \n"
-          + "      \"stream.kafka.consumer.factory.class.name\": \"org.apache.pinot.core.realtime.impl.kafka.KafkaConsumerFactory\", \n"
-          + "      \"stream.kafka.decoder.class.name\": \"" + FakeStreamMessageDecoder.class.getName() + "\", \n"
-          + "      \"stream.kafka.decoder.prop.schema.registry.rest.url\": \"http://schema-registry-host.corp.ceo:1766/schemas\", \n"
-          + "      \"stream.kafka.decoder.prop.schema.registry.schema.name\": \"UnknownSchema\", \n"
-          + "      \"stream.kafka.hlc.zk.connect.string\": \"zoo:2181/kafka-queuing\", \n"
-          + "      \"stream.kafka.topic.name\": \"" + _topicName + "\", \n"
-          + "      \"stream.kafka.zk.broker.url\": \"kafka-broker:2181/kafka-queuing\", \n"
-          + "      \"streamType\": \"kafka\"\n" + "    }\n" + "  }, \n" + "  \"tableName\": \"Coffee_REALTIME\", \n"
+          + "\": \"" + maxTimeForSegmentCloseMs + "\", \n" + "      \"stream.fakeStream.broker.list\": \"broker:7777\", \n"
+          + "      \"stream.fakeStream.consumer.prop.auto.offset.reset\": \"smallest\", \n"
+          + "      \"stream.fakeStream.consumer.type\": \"simple\", \n"
+          + "      \"stream.fakeStream.consumer.factory.class.name\": \"" + FakeStreamConsumerFactory.class.getName()+ "\", \n"
+          + "      \"stream.fakeStream.decoder.class.name\": \"" + FakeStreamMessageDecoder.class.getName() + "\", \n"
+          + "      \"stream.fakeStream.decoder.prop.schema.registry.rest.url\": \"http://schema-registry-host.corp.ceo:1766/schemas\", \n"
+          + "      \"stream.fakeStream.decoder.prop.schema.registry.schema.name\": \"UnknownSchema\", \n"
+          + "      \"stream.fakeStream.hlc.zk.connect.string\": \"zoo:2181/kafka-queuing\", \n"
+          + "      \"stream.fakeStream.topic.name\": \"" + _topicName + "\", \n"
+          + "      \"stream.fakeStream.zk.broker.url\": \"kafka-broker:2181/kafka-queuing\", \n"
+          + "      \"streamType\": \"fakeStream\"\n" + "    }\n" + "  }, \n" + "  \"tableName\": \"Coffee_REALTIME\", \n"
           + "  \"tableType\": \"realtime\", \n" + "  \"tenants\": {\n" + "    \"broker\": \"shared\", \n"
           + "    \"server\": \"server-1\"\n" + "  }\n" + "}";
 
@@ -133,25 +130,6 @@ public class LLRealtimeSegmentDataManagerTest {
     return segmentZKMetadata;
   }
 
-  public static class FakeStreamMessageDecoder implements StreamMessageDecoder<byte[]> {
-
-    @Override
-    public void init(Map<String, String> props, Schema indexingSchema, String topicName)
-        throws Exception {
-
-    }
-
-    @Override
-    public GenericRow decode(byte[] payload, GenericRow destination) {
-      return null;
-    }
-
-    @Override
-    public GenericRow decode(byte[] payload, int offset, int length, GenericRow destination) {
-      return null;
-    }
-  }
-
   private FakeLLRealtimeSegmentDataManager createFakeSegmentManager()
       throws Exception {
     LLCRealtimeSegmentZKMetadata segmentZKMetadata = createZkMetadata();
@@ -717,7 +695,7 @@ public class LLRealtimeSegmentDataManagerTest {
     protected boolean consumeLoop()
         throws Exception {
       if (_throwExceptionFromConsume) {
-        throw new PermanentConsumerException(Errors.OFFSET_OUT_OF_RANGE.exception());
+        throw new PermanentConsumerException(new Throwable("Offset out of range"));
       }
       setCurrentOffset(_consumeOffsets.remove());
       terminateLoopIfNecessary();
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakePartitionLevelConsumer.java b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakePartitionLevelConsumer.java
new file mode 100644
index 0000000..3695022
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakePartitionLevelConsumer.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.fakestream;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.core.realtime.stream.MessageBatch;
+import org.apache.pinot.core.realtime.stream.PartitionLevelConsumer;
+import org.apache.pinot.core.realtime.stream.StreamConfig;
+import org.apache.pinot.core.util.AvroUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils.*;
+
+
+/**
+ * Implementation of {@link PartitionLevelConsumer} for fake stream
+ * Unpacks tar files in /resources/data/On_Time_Performance_2014_partition_<partition>.tar.gz as source of messages
+ */
+public class FakePartitionLevelConsumer implements PartitionLevelConsumer {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(FakePartitionLevelConsumer.class.getName());
+
+  private List<Integer> messageOffsets = new ArrayList<>();
+  private List<byte[]> messageBytes = new ArrayList<>();
+
+  FakePartitionLevelConsumer(int partition, StreamConfig streamConfig) {
+
+    // TODO: this logic can move to a FakeStreamProducer instead of being inside the Consumer
+    File tempDir = new File(FileUtils.getTempDirectory(), getClass().getSimpleName());
+    File outputDir = new File(tempDir, String.valueOf(partition));
+
+    int offset = 0;
+
+    try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream(65536)) {
+      File avroFile = unpackAvroTarFile(outputDir).get(0);
+
+      int numPartitions = FakeStreamConfigUtils.getNumPartitions(streamConfig);
+
+      try (DataFileStream<GenericRecord> reader = AvroUtils.getAvroReader(avroFile)) {
+        BinaryEncoder binaryEncoder = new EncoderFactory().directBinaryEncoder(outputStream, null);
+        GenericDatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(reader.getSchema());
+
+        int recordNumber = 0;
+        for (GenericRecord genericRecord : reader) {
+          if (getPartitionNumber(recordNumber++, numPartitions) != partition) {
+            continue;
+          }
+          outputStream.reset();
+
+          datumWriter.write(genericRecord, binaryEncoder);
+          binaryEncoder.flush();
+
+          byte[] bytes = outputStream.toByteArray();
+          // contiguous offsets
+          messageOffsets.add(offset++);
+          messageBytes.add(bytes);
+        }
+      }
+    } catch (Exception e) {
+      LOGGER.error("Could not create {}", FakePartitionLevelConsumer.class.getName(), e);
+    } finally {
+      FileUtils.deleteQuietly(outputDir);
+    }
+  }
+
+  @Override
+  public MessageBatch fetchMessages(long startOffset, long endOffset, int timeoutMillis) throws TimeoutException {
+    if (startOffset >= FakeStreamConfigUtils.getLargestOffset()) {
+      return new FakeStreamMessageBatch(Collections.emptyList(), Collections.emptyList());
+    }
+    if (startOffset < FakeStreamConfigUtils.getSmallestOffset()) {
+      startOffset = FakeStreamConfigUtils.getSmallestOffset();
+    }
+    if (endOffset > FakeStreamConfigUtils.getLargestOffset()) {
+      endOffset = FakeStreamConfigUtils.getLargestOffset();
+    }
+    return new FakeStreamMessageBatch(messageOffsets.subList((int) startOffset, (int) endOffset),
+        messageBytes.subList((int) startOffset, (int) endOffset));
+  }
+
+  @Override
+  public void close() throws IOException {
+  }
+
+  /**
+   * Partitions the raw data
+   * This can be abstracted out and injected via stream configs to incorporate custom partitioning logic
+   */
+  public int getPartitionNumber(int recordNumber, int numPartitions) {
+    return recordNumber % numPartitions;
+  }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConfigUtils.java b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConfigUtils.java
new file mode 100644
index 0000000..5505b02
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConfigUtils.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.fakestream;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.data.Schema;
+import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.core.realtime.stream.StreamConfig;
+import org.apache.pinot.core.realtime.stream.StreamConfigProperties;
+import org.apache.pinot.util.TestUtils;
+import org.testng.Assert;
+
+
+/**
+ * Helper methods to provide fake stream details
+ * TODO: make input tar file and pinot schema configurable
+ */
+public class FakeStreamConfigUtils {
+  private static final String AVRO_TAR_FILE = "fake_stream_avro_data.tar.gz";
+  // This avro schema file must be in sync with the avro data
+  private static final String AVRO_SCHEMA_FILE = "fake_stream_avro_schema.avsc";
+  private static final String PINOT_SCHEMA_FILE = "fake_stream_pinot_schema.json";
+
+  private static final int SMALLEST_OFFSET = 0;
+  private static final int LARGEST_OFFSET = Integer.MAX_VALUE;
+  private static final String NUM_PARTITIONS_KEY = "num.partitions";
+  private static final int DEFAULT_NUM_PARTITIONS = 2;
+
+  private static final String STREAM_TYPE = "fakeStream";
+  private static final String TOPIC_NAME = "fakeTopic";
+  private static final String CONSUMER_FACTORY_CLASS = FakeStreamConsumerFactory.class.getName();
+  private static final String OFFSET_CRITERIA = "smallest";
+  private static final String DECODER_CLASS = FakeStreamMessageDecoder.class.getName();
+  private static final int SEGMENT_FLUSH_THRESHOLD_ROWS = 500;
+
+  /**
+   * Gets default num partitions
+   */
+  static int getNumPartitions(StreamConfig streamConfig) {
+    Map<String, String> streamConfigsMap = streamConfig.getStreamConfigsMap();
+    String numPartitionsKey =
+        StreamConfigProperties.constructStreamProperty(streamConfig.getType(), NUM_PARTITIONS_KEY);
+    if (streamConfigsMap.containsKey(numPartitionsKey)) {
+      return Integer.parseInt(streamConfigsMap.get(numPartitionsKey));
+    }
+    return DEFAULT_NUM_PARTITIONS;
+  }
+
+  /**
+   * Gets smallest offset based on data
+   */
+  static int getSmallestOffset() {
+    return SMALLEST_OFFSET;
+  }
+
+  /**
+   * Gets largest offset based on data
+   */
+  static int getLargestOffset() {
+    return LARGEST_OFFSET;
+  }
+
+  /**
+   * Unpacks avro tar file
+   */
+  static List<File> unpackAvroTarFile(File outputDir) throws Exception {
+    if (outputDir.exists()) {
+      FileUtils.deleteDirectory(outputDir);
+    }
+    File avroTarFile = getResourceFile(AVRO_TAR_FILE);
+    return TarGzCompressionUtils.unTar(avroTarFile, outputDir);
+  }
+
+  /**
+   * Gets avro schema
+   */
+  static File getAvroSchemaFile() {
+    return getResourceFile(AVRO_SCHEMA_FILE);
+  }
+
+  /**
+   * Gets pinot schema
+   */
+  static Schema getPinotSchema() throws IOException {
+    File schemaFile = getResourceFile(PINOT_SCHEMA_FILE);
+    return Schema.fromFile(schemaFile);
+  }
+
+  private static File getResourceFile(String fileName) {
+    URL resourceURL = FakeStreamConfigUtils.class.getClassLoader().getResource("data/fakestream");
+    Assert.assertNotNull(resourceURL);
+    return new File(TestUtils.getFileFromResourceUrl(resourceURL), fileName);
+  }
+
+  /**
+   * Generate fake stream configs for low level stream with custom number of partitions
+   */
+  public static StreamConfig getDefaultLowLevelStreamConfigs(int numPartitions) {
+    Map<String, String> streamConfigMap = getDefaultStreamConfigs();
+    streamConfigMap.put(
+        StreamConfigProperties.constructStreamProperty(STREAM_TYPE, StreamConfigProperties.STREAM_CONSUMER_TYPES),
+        StreamConfig.ConsumerType.LOWLEVEL.toString());
+    streamConfigMap.put(
+        StreamConfigProperties.constructStreamProperty(STREAM_TYPE, NUM_PARTITIONS_KEY),
+        String.valueOf(numPartitions));
+
+    return new StreamConfig(streamConfigMap);
+  }
+
+  /**
+   * Generate fake stream configs for low level stream
+   */
+  public static StreamConfig getDefaultLowLevelStreamConfigs() {
+    return getDefaultLowLevelStreamConfigs(DEFAULT_NUM_PARTITIONS);
+  }
+
+  /**
+   * Generate fake stream configs for high level stream
+   */
+  public static StreamConfig getDefaultHighLevelStreamConfigs() {
+    Map<String, String> streamConfigMap = getDefaultStreamConfigs();
+    streamConfigMap.put(
+        StreamConfigProperties.constructStreamProperty(STREAM_TYPE, StreamConfigProperties.STREAM_CONSUMER_TYPES),
+        StreamConfig.ConsumerType.HIGHLEVEL.toString());
+
+    return new StreamConfig(streamConfigMap);
+  }
+
+  private static Map<String, String> getDefaultStreamConfigs() {
+    Map<String, String> streamConfigMap = new HashMap<>();
+    streamConfigMap.put(StreamConfigProperties.STREAM_TYPE, STREAM_TYPE);
+    streamConfigMap.put(
+        StreamConfigProperties.constructStreamProperty(STREAM_TYPE, StreamConfigProperties.STREAM_TOPIC_NAME),
+        TOPIC_NAME);
+    streamConfigMap.put(StreamConfigProperties.constructStreamProperty(STREAM_TYPE,
+        StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS), CONSUMER_FACTORY_CLASS);
+    streamConfigMap.put(StreamConfigProperties.constructStreamProperty(STREAM_TYPE,
+        StreamConfigProperties.STREAM_CONSUMER_OFFSET_CRITERIA), OFFSET_CRITERIA);
+    streamConfigMap.put(
+        StreamConfigProperties.constructStreamProperty(STREAM_TYPE, StreamConfigProperties.STREAM_DECODER_CLASS),
+        DECODER_CLASS);
+    streamConfigMap.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS,
+        Integer.toString(SEGMENT_FLUSH_THRESHOLD_ROWS));
+    return streamConfigMap;
+  }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
new file mode 100644
index 0000000..92ab3e3
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.fakestream;
+
+import org.apache.pinot.common.data.Schema;
+import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.core.data.GenericRow;
+import org.apache.pinot.core.realtime.stream.MessageBatch;
+import org.apache.pinot.core.realtime.stream.OffsetCriteria;
+import org.apache.pinot.core.realtime.stream.PartitionLevelConsumer;
+import org.apache.pinot.core.realtime.stream.StreamConfig;
+import org.apache.pinot.core.realtime.stream.StreamConsumerFactory;
+import org.apache.pinot.core.realtime.stream.StreamConsumerFactoryProvider;
+import org.apache.pinot.core.realtime.stream.StreamDecoderProvider;
+import org.apache.pinot.core.realtime.stream.StreamLevelConsumer;
+import org.apache.pinot.core.realtime.stream.StreamMessageDecoder;
+import org.apache.pinot.core.realtime.stream.StreamMetadataProvider;
+
+
+/**
+ * Implementation of {@link StreamConsumerFactory} for a fake stream
+ * Data source is /resources/data/fakestream_avro_data.tar.gz
+ * Avro schema is /resources/data/fakestream/fake_stream_avro_schema.avsc
+ * Pinot schema is /resources/data/fakestream/fake_stream_pinot_schema.avsc
+ */
+public class FakeStreamConsumerFactory extends StreamConsumerFactory {
+
+  @Override
+  public PartitionLevelConsumer createPartitionLevelConsumer(String clientId, int partition) {
+    return new FakePartitionLevelConsumer(partition, _streamConfig);
+  }
+
+  @Override
+  public StreamLevelConsumer createStreamLevelConsumer(String clientId, String tableName, Schema schema,
+      InstanceZKMetadata instanceZKMetadata, ServerMetrics serverMetrics) {
+    return new FakeStreamLevelConsumer();
+  }
+
+  @Override
+  public StreamMetadataProvider createPartitionMetadataProvider(String clientId, int partition) {
+    return new FakeStreamMetadataProvider(_streamConfig);
+  }
+
+  @Override
+  public StreamMetadataProvider createStreamMetadataProvider(String clientId) {
+    return new FakeStreamMetadataProvider(_streamConfig);
+  }
+
+  public static void main(String[] args) throws Exception {
+    String clientId = "client_id_localhost_tester";
+
+    // stream config
+    int numPartitions = 5;
+    StreamConfig streamConfig = FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs(numPartitions);
+
+    // stream consumer factory
+    StreamConsumerFactory streamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig);
+
+    // stream metadata provider
+    StreamMetadataProvider streamMetadataProvider = streamConsumerFactory.createStreamMetadataProvider(clientId);
+    int partitionCount = streamMetadataProvider.fetchPartitionCount(10_000);
+    System.out.println(partitionCount);
+
+    // Partition metadata provider
+    int partition = 3;
+    StreamMetadataProvider partitionMetadataProvider =
+        streamConsumerFactory.createPartitionMetadataProvider(clientId, partition);
+    long partitionOffset =
+        partitionMetadataProvider.fetchPartitionOffset(OffsetCriteria.SMALLEST_OFFSET_CRITERIA, 10_000);
+    System.out.println(partitionOffset);
+
+    // Partition level consumer
+    PartitionLevelConsumer partitionLevelConsumer =
+        streamConsumerFactory.createPartitionLevelConsumer(clientId, partition);
+    MessageBatch messageBatch = partitionLevelConsumer.fetchMessages(10, 40, 10_000);
+
+    // Message decoder
+    Schema pinotSchema = FakeStreamConfigUtils.getPinotSchema();
+    StreamMessageDecoder streamMessageDecoder = StreamDecoderProvider.create(streamConfig, pinotSchema);
+    GenericRow decodedRow = new GenericRow();
+    streamMessageDecoder.decode(messageBatch.getMessageAtIndex(0), decodedRow);
+    System.out.println(decodedRow);
+  }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamLevelConsumer.java b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamLevelConsumer.java
new file mode 100644
index 0000000..bf16371
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamLevelConsumer.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.fakestream;
+
+import org.apache.pinot.core.data.GenericRow;
+import org.apache.pinot.core.realtime.stream.StreamLevelConsumer;
+
+
+/**
+ * Test implementation of {@link StreamLevelConsumer}
+ * This is currently a no-op
+ */
+public class FakeStreamLevelConsumer implements StreamLevelConsumer {
+  @Override
+  public void start() throws Exception {
+
+  }
+
+  @Override
+  public GenericRow next(GenericRow destination) {
+    return destination;
+  }
+
+  @Override
+  public void commit() {
+
+  }
+
+  @Override
+  public void shutdown() throws Exception {
+
+  }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMessageBatch.java b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMessageBatch.java
new file mode 100644
index 0000000..4afd6da
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMessageBatch.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.fakestream;
+
+import java.util.List;
+import org.apache.pinot.core.realtime.stream.MessageBatch;
+
+
+/**
+ * MessageBatch implementation for the fake stream
+ */
+public class FakeStreamMessageBatch implements MessageBatch<byte[]> {
+  private List<Integer> _messageOffsets;
+  private List<byte[]> _messageBytes;
+
+  FakeStreamMessageBatch(List<Integer> messageOffsets, List<byte[]> messageBytes) {
+    _messageOffsets = messageOffsets;
+    _messageBytes = messageBytes;
+  }
+
+  public int getMessageCount() {
+    return _messageOffsets.size();
+  }
+
+  public byte[] getMessageAtIndex(int index) {
+    return _messageBytes.get(index);
+  }
+
+  public int getMessageOffsetAtIndex(int index) {
+    return _messageOffsets.get(index);
+  }
+
+  public int getMessageLengthAtIndex(int index) {
+    return _messageBytes.get(index).length;
+  }
+
+  public long getNextStreamMessageOffsetAtIndex(int index) {
+    return _messageOffsets.get(index) + 1;
+  }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMessageDecoder.java b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMessageDecoder.java
new file mode 100644
index 0000000..946fbbc
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMessageDecoder.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.fakestream;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.pinot.common.data.Schema;
+import org.apache.pinot.core.data.GenericRow;
+import org.apache.pinot.core.realtime.impl.kafka.AvroRecordToPinotRowGenerator;
+import org.apache.pinot.core.realtime.stream.StreamMessageDecoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * StreamMessageDecoder implementation for fake stream
+ */
+public class FakeStreamMessageDecoder implements StreamMessageDecoder<byte[]> {
+  private static final Logger LOGGER = LoggerFactory.getLogger(FakeStreamMessageDecoder.class);
+
+  private org.apache.avro.Schema _avroSchema;
+  private DatumReader<GenericData.Record> _datumReader;
+  private AvroRecordToPinotRowGenerator _avroRecordConverter;
+  private BinaryDecoder _binaryDecoderToReuse;
+  private GenericData.Record _avroRecordToReuse;
+
+  @Override
+  public void init(Map props, Schema indexingSchema, String topicName) throws Exception {
+    _avroSchema = new org.apache.avro.Schema.Parser().parse(FakeStreamConfigUtils.getAvroSchemaFile());
+    _datumReader = new GenericDatumReader<>(_avroSchema);
+    _avroRecordConverter = new AvroRecordToPinotRowGenerator(indexingSchema);
+  }
+
+  @Override
+  public GenericRow decode(byte[] payload, GenericRow destination) {
+    return decode(payload, 0, payload.length, destination);
+  }
+
+  @Override
+  public GenericRow decode(byte[] payload, int offset, int length, GenericRow destination) {
+    _binaryDecoderToReuse = DecoderFactory.get().binaryDecoder(payload, offset, length, _binaryDecoderToReuse);
+    try {
+      _avroRecordToReuse = _datumReader.read(_avroRecordToReuse, _binaryDecoderToReuse);
+    } catch (IOException e) {
+      LOGGER.error("Caught exception while reading message using schema: {}", _avroSchema, e);
+      return null;
+    }
+    return _avroRecordConverter.transform(_avroRecordToReuse, destination);
+  }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMetadataProvider.java b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMetadataProvider.java
new file mode 100644
index 0000000..a75660d
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMetadataProvider.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.fakestream;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.TimeoutException;
+import javax.annotation.Nonnull;
+import org.apache.pinot.core.realtime.stream.OffsetCriteria;
+import org.apache.pinot.core.realtime.stream.StreamConfig;
+import org.apache.pinot.core.realtime.stream.StreamConfigProperties;
+import org.apache.pinot.core.realtime.stream.StreamMetadataProvider;
+
+
+/**
+ * StreamMetadataProvider implementation for the fake stream
+ */
+public class FakeStreamMetadataProvider implements StreamMetadataProvider {
+  private int _numPartitions;
+
+  public FakeStreamMetadataProvider(StreamConfig streamConfig) {
+    _numPartitions = FakeStreamConfigUtils.getNumPartitions(streamConfig);
+  }
+
+  @Override
+  public int fetchPartitionCount(long timeoutMillis) {
+    return _numPartitions;
+  }
+
+  @Override
+  public long fetchPartitionOffset(@Nonnull OffsetCriteria offsetCriteria, long timeoutMillis) throws TimeoutException {
+    if (offsetCriteria.isSmallest()) {
+      return FakeStreamConfigUtils.getSmallestOffset();
+    } else {
+      return FakeStreamConfigUtils.getLargestOffset();
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+
+  }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/realtime/stream/StreamConfigTest.java b/pinot-core/src/test/java/org/apache/pinot/core/realtime/stream/StreamConfigTest.java
index 3345487..45acc92 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/realtime/stream/StreamConfigTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/realtime/stream/StreamConfigTest.java
@@ -22,8 +22,9 @@ import java.util.HashMap;
 import java.util.Map;
 import org.apache.pinot.common.utils.DataSize;
 import org.apache.pinot.common.utils.time.TimeUtils;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaAvroMessageDecoder;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaConsumerFactory;
+import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
+import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConsumerFactory;
+import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamMessageDecoder;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -37,12 +38,13 @@ public class StreamConfigTest {
   public void testStreamConfig() {
     boolean exception = false;
     StreamConfig streamConfig;
-    String streamType = "kafka";
-    String topic = "aTopic";
+    String streamType = "fakeStream";
+    String topic = "fakeTopic";
     String consumerType = StreamConfig.ConsumerType.LOWLEVEL.toString();
-    String consumerFactoryClassName = KafkaConsumerFactory.class.getName();
-    String decoderClass = KafkaAvroMessageDecoder.class.getName();
+    String consumerFactoryClass = FakeStreamConsumerFactory.class.getName();
+    String decoderClass = FakeStreamMessageDecoder.class.getName();
 
+    // test with empty map
     try {
       Map<String, String> streamConfigMap = new HashMap<>();
       streamConfig = new StreamConfig(streamConfigMap);
@@ -54,18 +56,16 @@ public class StreamConfigTest {
     // All mandatory properties set
     Map<String, String> streamConfigMap = new HashMap<>();
     streamConfigMap.put(StreamConfigProperties.STREAM_TYPE, streamType);
-    streamConfigMap
-        .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_TOPIC_NAME),
-            topic);
-    streamConfigMap
-        .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
-            consumerType);
-    streamConfigMap.put(StreamConfigProperties
-            .constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS),
-        consumerFactoryClassName);
-    streamConfigMap
-        .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_DECODER_CLASS),
-            decoderClass);
+    streamConfigMap.put(
+        StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_TOPIC_NAME), topic);
+    streamConfigMap.put(
+        StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
+        consumerType);
+    streamConfigMap.put(StreamConfigProperties.constructStreamProperty(streamType,
+        StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS), consumerFactoryClass);
+    streamConfigMap.put(
+        StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_DECODER_CLASS),
+        decoderClass);
     streamConfig = new StreamConfig(streamConfigMap);
 
     // Missing streamType
@@ -80,8 +80,8 @@ public class StreamConfigTest {
 
     // Missing stream topic
     streamConfigMap.put(StreamConfigProperties.STREAM_TYPE, streamType);
-    streamConfigMap
-        .remove(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_TOPIC_NAME));
+    streamConfigMap.remove(
+        StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_TOPIC_NAME));
     exception = false;
     try {
       streamConfig = new StreamConfig(streamConfigMap);
@@ -91,9 +91,8 @@ public class StreamConfigTest {
     Assert.assertTrue(exception);
 
     // Missing consumer type
-    streamConfigMap
-        .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_TOPIC_NAME),
-            topic);
+    streamConfigMap.put(
+        StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_TOPIC_NAME), topic);
     streamConfigMap.remove(
         StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES));
     exception = false;
@@ -105,11 +104,11 @@ public class StreamConfigTest {
     Assert.assertTrue(exception);
 
     // Missing consumer factory
-    streamConfigMap
-        .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
-            consumerType);
-    streamConfigMap.remove(StreamConfigProperties
-        .constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS));
+    streamConfigMap.put(
+        StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
+        consumerType);
+    streamConfigMap.remove(StreamConfigProperties.constructStreamProperty(streamType,
+        StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS));
     exception = false;
     try {
       streamConfig = new StreamConfig(streamConfigMap);
@@ -120,9 +119,8 @@ public class StreamConfigTest {
     Assert.assertEquals(streamConfig.getConsumerFactoryClassName(), StreamConfig.getDefaultConsumerFactoryClassName());
 
     // Missing decoder class
-    streamConfigMap.put(StreamConfigProperties
-            .constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS),
-        consumerFactoryClassName);
+    streamConfigMap.put(StreamConfigProperties.constructStreamProperty(streamType,
+        StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS), consumerFactoryClass);
     streamConfigMap.remove(
         StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_DECODER_CLASS));
     exception = false;
@@ -133,14 +131,14 @@ public class StreamConfigTest {
     }
     Assert.assertTrue(exception);
 
-    streamConfigMap
-        .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_DECODER_CLASS),
-            decoderClass);
+    streamConfigMap.put(
+        StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_DECODER_CLASS),
+        decoderClass);
     streamConfig = new StreamConfig(streamConfigMap);
     Assert.assertEquals(streamConfig.getType(), streamType);
     Assert.assertEquals(streamConfig.getTopicName(), topic);
     Assert.assertEquals(streamConfig.getConsumerTypes().get(0), StreamConfig.ConsumerType.LOWLEVEL);
-    Assert.assertEquals(streamConfig.getConsumerFactoryClassName(), consumerFactoryClassName);
+    Assert.assertEquals(streamConfig.getConsumerFactoryClassName(), consumerFactoryClass);
     Assert.assertEquals(streamConfig.getDecoderClass(), decoderClass);
   }
 
@@ -149,44 +147,42 @@ public class StreamConfigTest {
    */
   @Test
   public void testStreamConfigDefaults() {
-    String streamType = "kafka";
-    String topic = "aTopic";
+    String streamType = "fakeStream";
+    String topic = "fakeTopic";
     String consumerType = "simple";
-    String consumerFactoryClassName = KafkaConsumerFactory.class.getName();
-    String decoderClass = KafkaAvroMessageDecoder.class.getName();
+    String consumerFactoryClass = FakeStreamConsumerFactory.class.getName();
+    String decoderClass = FakeStreamMessageDecoder.class.getName();
 
     Map<String, String> streamConfigMap = new HashMap<>();
     streamConfigMap.put(StreamConfigProperties.STREAM_TYPE, streamType);
-    streamConfigMap
-        .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_TOPIC_NAME),
-            topic);
-    streamConfigMap
-        .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
-            consumerType);
-    streamConfigMap.put(StreamConfigProperties
-            .constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS),
-        consumerFactoryClassName);
-    streamConfigMap
-        .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_DECODER_CLASS),
-            decoderClass);
+    streamConfigMap.put(
+        StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_TOPIC_NAME), topic);
+    streamConfigMap.put(
+        StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
+        consumerType);
+    streamConfigMap.put(StreamConfigProperties.constructStreamProperty(streamType,
+        StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS), consumerFactoryClass);
+    streamConfigMap.put(
+        StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_DECODER_CLASS),
+        decoderClass);
 
     // Mandatory values + defaults
     StreamConfig streamConfig = new StreamConfig(streamConfigMap);
     Assert.assertEquals(streamConfig.getType(), streamType);
     Assert.assertEquals(streamConfig.getTopicName(), topic);
     Assert.assertEquals(streamConfig.getConsumerTypes().get(0), StreamConfig.ConsumerType.LOWLEVEL);
-    Assert.assertEquals(streamConfig.getConsumerFactoryClassName(), consumerFactoryClassName);
+    Assert.assertEquals(streamConfig.getConsumerFactoryClassName(), consumerFactoryClass);
     Assert.assertEquals(streamConfig.getDecoderClass(), decoderClass);
     Assert.assertEquals(streamConfig.getDecoderProperties().size(), 0);
-    Assert
-        .assertEquals(streamConfig.getOffsetCriteria(), new OffsetCriteria.OffsetCriteriaBuilder().withOffsetLargest());
-    Assert
-        .assertEquals(streamConfig.getConnectionTimeoutMillis(), StreamConfig.DEFAULT_STREAM_CONNECTION_TIMEOUT_MILLIS);
+    Assert.assertEquals(streamConfig.getOffsetCriteria(),
+        new OffsetCriteria.OffsetCriteriaBuilder().withOffsetLargest());
+    Assert.assertEquals(streamConfig.getConnectionTimeoutMillis(),
+        StreamConfig.DEFAULT_STREAM_CONNECTION_TIMEOUT_MILLIS);
     Assert.assertEquals(streamConfig.getFetchTimeoutMillis(), StreamConfig.DEFAULT_STREAM_FETCH_TIMEOUT_MILLIS);
     Assert.assertEquals(streamConfig.getFlushThresholdRows(), StreamConfig.getDefaultFlushThresholdRows());
     Assert.assertEquals(streamConfig.getFlushThresholdTimeMillis(), StreamConfig.getDefaultFlushThresholdTimeMillis());
-    Assert
-        .assertEquals(streamConfig.getFlushSegmentDesiredSizeBytes(), StreamConfig.getDefaultDesiredSegmentSizeBytes());
+    Assert.assertEquals(streamConfig.getFlushSegmentDesiredSizeBytes(),
+        StreamConfig.getDefaultDesiredSegmentSizeBytes());
 
     consumerType = "lowLevel,highLevel";
     String offsetCriteria = "smallest";
@@ -197,17 +193,16 @@ public class StreamConfigTest {
     String flushThresholdTime = "2h";
     String flushThresholdRows = "500";
     String flushSegmentSize = "20M";
-    streamConfigMap
-        .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
-            consumerType);
+    streamConfigMap.put(
+        StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
+        consumerType);
     streamConfigMap.put(
         StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.DECODER_PROPS_PREFIX) + "."
             + decoderProp1Key, decoderProp1Value);
-    streamConfigMap.put(StreamConfigProperties
-        .constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_OFFSET_CRITERIA), offsetCriteria);
-    streamConfigMap.put(StreamConfigProperties
-            .constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONNECTION_TIMEOUT_MILLIS),
-        connectionTimeout);
+    streamConfigMap.put(StreamConfigProperties.constructStreamProperty(streamType,
+        StreamConfigProperties.STREAM_CONSUMER_OFFSET_CRITERIA), offsetCriteria);
+    streamConfigMap.put(StreamConfigProperties.constructStreamProperty(streamType,
+        StreamConfigProperties.STREAM_CONNECTION_TIMEOUT_MILLIS), connectionTimeout);
     streamConfigMap.put(
         StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_FETCH_TIMEOUT_MILLIS),
         fetchTimeout);
@@ -220,11 +215,11 @@ public class StreamConfigTest {
     Assert.assertEquals(streamConfig.getTopicName(), topic);
     Assert.assertEquals(streamConfig.getConsumerTypes().get(0), StreamConfig.ConsumerType.LOWLEVEL);
     Assert.assertEquals(streamConfig.getConsumerTypes().get(1), StreamConfig.ConsumerType.HIGHLEVEL);
-    Assert.assertEquals(streamConfig.getConsumerFactoryClassName(), consumerFactoryClassName);
+    Assert.assertEquals(streamConfig.getConsumerFactoryClassName(), consumerFactoryClass);
     Assert.assertEquals(streamConfig.getDecoderClass(), decoderClass);
     Assert.assertEquals(streamConfig.getDecoderProperties().size(), 1);
     Assert.assertEquals(streamConfig.getDecoderProperties().get(decoderProp1Key), decoderProp1Value);
-    Assert.assertEquals(streamConfig.getOffsetCriteria().isSmallest(), true);
+    Assert.assertTrue(streamConfig.getOffsetCriteria().isSmallest());
     Assert.assertEquals(streamConfig.getConnectionTimeoutMillis(), Long.parseLong(connectionTimeout));
     Assert.assertEquals(streamConfig.getFetchTimeoutMillis(), Integer.parseInt(fetchTimeout));
     Assert.assertEquals(streamConfig.getFlushThresholdRows(), Integer.parseInt(flushThresholdRows));
@@ -250,33 +245,31 @@ public class StreamConfigTest {
   public void testStreamConfigValidations() {
     boolean exception;
     StreamConfig streamConfig;
-    String streamType = "kafka";
-    String topic = "aTopic";
+    String streamType = "fakeStream";
+    String topic = "fakeTopic";
     String consumerType = "simple";
-    String consumerFactoryClassName = KafkaConsumerFactory.class.getName();
-    String decoderClass = KafkaAvroMessageDecoder.class.getName();
+    String consumerFactoryClass = FakeStreamConsumerFactory.class.getName();
+    String decoderClass = FakeStreamMessageDecoder.class.getName();
 
     // All mandatory properties set
     Map<String, String> streamConfigMap = new HashMap<>();
     streamConfigMap.put(StreamConfigProperties.STREAM_TYPE, streamType);
-    streamConfigMap
-        .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_TOPIC_NAME),
-            topic);
-    streamConfigMap
-        .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
-            consumerType);
-    streamConfigMap.put(StreamConfigProperties
-            .constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS),
-        consumerFactoryClassName);
-    streamConfigMap
-        .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_DECODER_CLASS),
-            decoderClass);
+    streamConfigMap.put(
+        StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_TOPIC_NAME), topic);
+    streamConfigMap.put(
+        StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
+        consumerType);
+    streamConfigMap.put(StreamConfigProperties.constructStreamProperty(streamType,
+        StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS), consumerFactoryClass);
+    streamConfigMap.put(
+        StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_DECODER_CLASS),
+        decoderClass);
     streamConfig = new StreamConfig(streamConfigMap);
 
     // Invalid consumer type
-    streamConfigMap
-        .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
-            "invalidConsumerType");
+    streamConfigMap.put(
+        StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
+        "invalidConsumerType");
     exception = false;
     try {
       streamConfig = new StreamConfig(streamConfigMap);
@@ -286,9 +279,9 @@ public class StreamConfigTest {
     Assert.assertTrue(exception);
 
     // Invalid fetch timeout
-    streamConfigMap
-        .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
-            consumerType);
+    streamConfigMap.put(
+        StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
+        consumerType);
     streamConfigMap.put(
         StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_FETCH_TIMEOUT_MILLIS),
         "timeout");
@@ -298,15 +291,15 @@ public class StreamConfigTest {
     // Invalid connection timeout
     streamConfigMap.remove(
         StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_FETCH_TIMEOUT_MILLIS));
-    streamConfigMap.put(StreamConfigProperties
-        .constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONNECTION_TIMEOUT_MILLIS), "timeout");
+    streamConfigMap.put(StreamConfigProperties.constructStreamProperty(streamType,
+        StreamConfigProperties.STREAM_CONNECTION_TIMEOUT_MILLIS), "timeout");
     streamConfig = new StreamConfig(streamConfigMap);
-    Assert
-        .assertEquals(streamConfig.getConnectionTimeoutMillis(), StreamConfig.DEFAULT_STREAM_CONNECTION_TIMEOUT_MILLIS);
+    Assert.assertEquals(streamConfig.getConnectionTimeoutMillis(),
+        StreamConfig.DEFAULT_STREAM_CONNECTION_TIMEOUT_MILLIS);
 
     // Invalid flush threshold rows
-    streamConfigMap.remove(StreamConfigProperties
-        .constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONNECTION_TIMEOUT_MILLIS));
+    streamConfigMap.remove(StreamConfigProperties.constructStreamProperty(streamType,
+        StreamConfigProperties.STREAM_CONNECTION_TIMEOUT_MILLIS));
     streamConfigMap.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS, "rows");
     streamConfig = new StreamConfig(streamConfigMap);
     Assert.assertEquals(streamConfig.getFlushThresholdRows(), StreamConfig.getDefaultFlushThresholdRows());
@@ -321,8 +314,8 @@ public class StreamConfigTest {
     streamConfigMap.remove(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_TIME);
     streamConfigMap.put(StreamConfigProperties.SEGMENT_FLUSH_DESIRED_SIZE, "size");
     streamConfig = new StreamConfig(streamConfigMap);
-    Assert
-        .assertEquals(streamConfig.getFlushSegmentDesiredSizeBytes(), StreamConfig.getDefaultDesiredSegmentSizeBytes());
+    Assert.assertEquals(streamConfig.getFlushSegmentDesiredSizeBytes(),
+        StreamConfig.getDefaultDesiredSegmentSizeBytes());
   }
 
   /**
@@ -331,11 +324,11 @@ public class StreamConfigTest {
   @Test
   public void testFlushThresholdStreamConfigs() {
     StreamConfig streamConfig;
-    String streamType = "kafka";
-    String topic = "aTopic";
+    String streamType = "fakeStream";
+    String topic = "fakeTopic";
     String consumerType = "lowlevel";
-    String consumerFactoryClassName = KafkaConsumerFactory.class.getName();
-    String decoderClass = KafkaAvroMessageDecoder.class.getName();
+    String consumerFactoryClass = FakeStreamConsumerFactory.class.getName();
+    String decoderClass = FakeStreamMessageDecoder.class.getName();
     String flushThresholdRows = "200";
     String flushThresholdRowsLLC = "400";
     String flushThresholdTime = "2h";
@@ -343,18 +336,16 @@ public class StreamConfigTest {
 
     Map<String, String> streamConfigMap = new HashMap<>();
     streamConfigMap.put(StreamConfigProperties.STREAM_TYPE, streamType);
-    streamConfigMap
-        .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_TOPIC_NAME),
-            topic);
-    streamConfigMap
-        .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
-            consumerType);
-    streamConfigMap.put(StreamConfigProperties
-            .constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS),
-        consumerFactoryClassName);
-    streamConfigMap
-        .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_DECODER_CLASS),
-            decoderClass);
+    streamConfigMap.put(
+        StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_TOPIC_NAME), topic);
+    streamConfigMap.put(
+        StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
+        consumerType);
+    streamConfigMap.put(StreamConfigProperties.constructStreamProperty(streamType,
+        StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS), consumerFactoryClass);
+    streamConfigMap.put(
+        StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_DECODER_CLASS),
+        decoderClass);
 
     // use defaults if nothing provided
     streamConfig = new StreamConfig(streamConfigMap);
@@ -406,62 +397,60 @@ public class StreamConfigTest {
     streamConfigMap.remove(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS);
     streamConfigMap.remove(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_TIME);
     partitionLevelStreamConfig = new PartitionLevelStreamConfig(streamConfigMap);
-    Assert
-        .assertEquals(partitionLevelStreamConfig.getFlushThresholdRows(), StreamConfig.getDefaultFlushThresholdRows());
+    Assert.assertEquals(partitionLevelStreamConfig.getFlushThresholdRows(),
+        StreamConfig.getDefaultFlushThresholdRows());
     Assert.assertEquals(partitionLevelStreamConfig.getFlushThresholdTimeMillis(),
         StreamConfig.getDefaultFlushThresholdTimeMillis());
   }
 
   @Test
   public void testConsumerTypes() {
-    String streamType = "kafka";
-    String topic = "aTopic";
-    String consumerFactoryClassName = KafkaConsumerFactory.class.getName();
-    String decoderClass = KafkaAvroMessageDecoder.class.getName();
+    String streamType = "fakeStream";
+    String topic = "fakeTopic";
+    String consumerFactoryClass = FakeStreamConsumerFactory.class.getName();
+    String decoderClass = FakeStreamMessageDecoder.class.getName();
 
     Map<String, String> streamConfigMap = new HashMap<>();
     streamConfigMap.put(StreamConfigProperties.STREAM_TYPE, streamType);
-    streamConfigMap
-        .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_TOPIC_NAME),
-            topic);
-    streamConfigMap.put(StreamConfigProperties
-            .constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS),
-        consumerFactoryClassName);
-    streamConfigMap
-        .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_DECODER_CLASS),
-            decoderClass);
+    streamConfigMap.put(
+        StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_TOPIC_NAME), topic);
+    streamConfigMap.put(StreamConfigProperties.constructStreamProperty(streamType,
+        StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS), consumerFactoryClass);
+    streamConfigMap.put(
+        StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_DECODER_CLASS),
+        decoderClass);
 
     String consumerType = "simple";
-    streamConfigMap
-        .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
-            consumerType);
+    streamConfigMap.put(
+        StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
+        consumerType);
     StreamConfig streamConfig = new StreamConfig(streamConfigMap);
     Assert.assertEquals(streamConfig.getConsumerTypes().get(0), StreamConfig.ConsumerType.LOWLEVEL);
     Assert.assertTrue(streamConfig.hasLowLevelConsumerType());
     Assert.assertFalse(streamConfig.hasHighLevelConsumerType());
 
     consumerType = "lowlevel";
-    streamConfigMap
-        .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
-            consumerType);
+    streamConfigMap.put(
+        StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
+        consumerType);
     streamConfig = new StreamConfig(streamConfigMap);
     Assert.assertEquals(streamConfig.getConsumerTypes().get(0), StreamConfig.ConsumerType.LOWLEVEL);
     Assert.assertTrue(streamConfig.hasLowLevelConsumerType());
     Assert.assertFalse(streamConfig.hasHighLevelConsumerType());
 
     consumerType = "highLevel";
-    streamConfigMap
-        .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
-            consumerType);
+    streamConfigMap.put(
+        StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
+        consumerType);
     streamConfig = new StreamConfig(streamConfigMap);
     Assert.assertEquals(streamConfig.getConsumerTypes().get(0), StreamConfig.ConsumerType.HIGHLEVEL);
     Assert.assertFalse(streamConfig.hasLowLevelConsumerType());
     Assert.assertTrue(streamConfig.hasHighLevelConsumerType());
 
     consumerType = "highLevel,simple";
-    streamConfigMap
-        .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
-            consumerType);
+    streamConfigMap.put(
+        StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
+        consumerType);
     streamConfig = new StreamConfig(streamConfigMap);
     Assert.assertEquals(streamConfig.getConsumerTypes().get(0), StreamConfig.ConsumerType.HIGHLEVEL);
     Assert.assertEquals(streamConfig.getConsumerTypes().get(1), StreamConfig.ConsumerType.LOWLEVEL);
@@ -469,9 +458,9 @@ public class StreamConfigTest {
     Assert.assertTrue(streamConfig.hasHighLevelConsumerType());
 
     consumerType = "highLevel,lowlevel";
-    streamConfigMap
-        .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
-            consumerType);
+    streamConfigMap.put(
+        StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
+        consumerType);
     streamConfig = new StreamConfig(streamConfigMap);
     Assert.assertEquals(streamConfig.getConsumerTypes().get(0), StreamConfig.ConsumerType.HIGHLEVEL);
     Assert.assertEquals(streamConfig.getConsumerTypes().get(1), StreamConfig.ConsumerType.LOWLEVEL);
diff --git a/pinot-core/src/test/resources/data/fakestream/fake_stream_avro_data.tar.gz b/pinot-core/src/test/resources/data/fakestream/fake_stream_avro_data.tar.gz
new file mode 100644
index 0000000..994ad9f
Binary files /dev/null and b/pinot-core/src/test/resources/data/fakestream/fake_stream_avro_data.tar.gz differ
diff --git a/pinot-core/src/test/resources/data/fakestream/fake_stream_avro_schema.avsc b/pinot-core/src/test/resources/data/fakestream/fake_stream_avro_schema.avsc
new file mode 100644
index 0000000..0213a04
--- /dev/null
+++ b/pinot-core/src/test/resources/data/fakestream/fake_stream_avro_schema.avsc
@@ -0,0 +1,524 @@
+{
+  "type": "record",
+  "name": "Flight",
+  "namespace": "pinot",
+  "fields": [
+    {
+      "name": "DaysSinceEpoch",
+      "type": [
+        "int"
+      ]
+    },
+    {
+      "name": "Year",
+      "type": [
+        "int"
+      ]
+    },
+    {
+      "name": "Quarter",
+      "type": [
+        "int"
+      ]
+    },
+    {
+      "name": "Month",
+      "type": [
+        "int"
+      ]
+    },
+    {
+      "name": "DayofMonth",
+      "type": [
+        "int"
+      ]
+    },
+    {
+      "name": "DayOfWeek",
+      "type": [
+        "int"
+      ]
+    },
+    {
+      "name": "FlightDate",
+      "type": [
+        "string"
+      ]
+    },
+    {
+      "name": "UniqueCarrier",
+      "type": [
+        "string"
+      ]
+    },
+    {
+      "name": "AirlineID",
+      "type": [
+        "long"
+      ]
+    },
+    {
+      "name": "Carrier",
+      "type": [
+        "string"
+      ]
+    },
+    {
+      "name": "TailNum",
+      "type": [
+        "string",
+        "null"
+      ]
+    },
+    {
+      "name": "FlightNum",
+      "type": [
+        "int"
+      ]
+    },
+    {
+      "name": "OriginAirportID",
+      "type": [
+        "int"
+      ]
+    },
+    {
+      "name": "OriginAirportSeqID",
+      "type": [
+        "int"
+      ]
+    },
+    {
+      "name": "OriginCityMarketID",
+      "type": [
+        "int"
+      ]
+    },
+    {
+      "name": "Origin",
+      "type": [
+        "string"
+      ]
+    },
+    {
+      "name": "OriginCityName",
+      "type": [
+        "string"
+      ]
+    },
+    {
+      "name": "OriginState",
+      "type": [
+        "string"
+      ]
+    },
+    {
+      "name": "OriginStateFips",
+      "type": [
+        "int",
+        "null"
+      ]
+    },
+    {
+      "name": "OriginStateName",
+      "type": [
+        "string"
+      ]
+    },
+    {
+      "name": "OriginWac",
+      "type": [
+        "int"
+      ]
+    },
+    {
+      "name": "DestAirportID",
+      "type": [
+        "int"
+      ]
+    },
+    {
+      "name": "DestAirportSeqID",
+      "type": [
+        "int"
+      ]
+    },
+    {
+      "name": "DestCityMarketID",
+      "type": [
+        "int"
+      ]
+    },
+    {
+      "name": "Dest",
+      "type": [
+        "string"
+      ]
+    },
+    {
+      "name": "DestCityName",
+      "type": [
+        "string"
+      ]
+    },
+    {
+      "name": "DestState",
+      "type": [
+        "string"
+      ]
+    },
+    {
+      "name": "DestStateFips",
+      "type": [
+        "int",
+        "null"
+      ]
+    },
+    {
+      "name": "DestStateName",
+      "type": [
+        "string"
+      ]
+    },
+    {
+      "name": "DestWac",
+      "type": [
+        "int"
+      ]
+    },
+    {
+      "name": "CRSDepTime",
+      "type": [
+        "int"
+      ]
+    },
+    {
+      "name": "DepTime",
+      "type": [
+        "int",
+        "null"
+      ]
+    },
+    {
+      "name": "DepDelay",
+      "type": [
+        "double",
+        "null"
+      ]
+    },
+    {
+      "name": "DepDelayMinutes",
+      "type": [
+        "float",
+        "null"
+      ]
+    },
+    {
+      "name": "DepDel15",
+      "type": [
+        "int",
+        "null"
+      ]
+    },
+    {
+      "name": "DepartureDelayGroups",
+      "type": [
+        "int",
+        "null"
+      ]
+    },
+    {
+      "name": "DepTimeBlk",
+      "type": [
+        "string"
+      ]
+    },
+    {
+      "name": "TaxiOut",
+      "type": [
+        "int",
+        "null"
+      ]
+    },
+    {
+      "name": "WheelsOff",
+      "type": [
+        "int",
+        "null"
+      ]
+    },
+    {
+      "name": "WheelsOn",
+      "type": [
+        "int",
+        "null"
+      ]
+    },
+    {
+      "name": "TaxiIn",
+      "type": [
+        "int",
+        "null"
+      ]
+    },
+    {
+      "name": "CRSArrTime",
+      "type": [
+        "int"
+      ]
+    },
+    {
+      "name": "ArrTime",
+      "type": [
+        "int",
+        "null"
+      ]
+    },
+    {
+      "name": "ArrDelay",
+      "type": [
+        "float",
+        "null"
+      ]
+    },
+    {
+      "name": "ArrDelayMinutes",
+      "type": [
+        "double",
+        "null"
+      ]
+    },
+    {
+      "name": "ArrDel15",
+      "type": [
+        "int",
+        "null"
+      ]
+    },
+    {
+      "name": "ArrivalDelayGroups",
+      "type": [
+        "int",
+        "null"
+      ]
+    },
+    {
+      "name": "ArrTimeBlk",
+      "type": [
+        "string"
+      ]
+    },
+    {
+      "name": "Cancelled",
+      "type": [
+        "int"
+      ]
+    },
+    {
+      "name": "CancellationCode",
+      "type": [
+        "string",
+        "null"
+      ]
+    },
+    {
+      "name": "Diverted",
+      "type": [
+        "int"
+      ]
+    },
+    {
+      "name": "CRSElapsedTime",
+      "type": [
+        "int",
+        "null"
+      ]
+    },
+    {
+      "name": "ActualElapsedTime",
+      "type": [
+        "int",
+        "null"
+      ]
+    },
+    {
+      "name": "AirTime",
+      "type": [
+        "int",
+        "null"
+      ]
+    },
+    {
+      "name": "Flights",
+      "type": [
+        "int"
+      ]
+    },
+    {
+      "name": "Distance",
+      "type": [
+        "int"
+      ]
+    },
+    {
+      "name": "DistanceGroup",
+      "type": [
+        "int"
+      ]
+    },
+    {
+      "name": "CarrierDelay",
+      "type": [
+        "int",
+        "null"
+      ]
+    },
+    {
+      "name": "WeatherDelay",
+      "type": [
+        "int",
+        "null"
+      ]
+    },
+    {
+      "name": "NASDelay",
+      "type": [
+        "int",
+        "null"
+      ]
+    },
+    {
+      "name": "SecurityDelay",
+      "type": [
+        "int",
+        "null"
+      ]
+    },
+    {
+      "name": "LateAircraftDelay",
+      "type": [
+        "int",
+        "null"
+      ]
+    },
+    {
+      "name": "FirstDepTime",
+      "type": [
+        "int",
+        "null"
+      ]
+    },
+    {
+      "name": "TotalAddGTime",
+      "type": [
+        "int",
+        "null"
+      ]
+    },
+    {
+      "name": "LongestAddGTime",
+      "type": [
+        "int",
+        "null"
+      ]
+    },
+    {
+      "name": "DivAirportLandings",
+      "type": [
+        "int",
+        "null"
+      ]
+    },
+    {
+      "name": "DivReachedDest",
+      "type": [
+        "int",
+        "null"
+      ]
+    },
+    {
+      "name": "DivActualElapsedTime",
+      "type": [
+        "int",
+        "null"
+      ]
+    },
+    {
+      "name": "DivArrDelay",
+      "type": [
+        "int",
+        "null"
+      ]
+    },
+    {
+      "name": "DivDistance",
+      "type": [
+        "int",
+        "null"
+      ]
+    },
+    {
+      "name": "DivAirports",
+      "type": {
+        "type": "array",
+        "items": "string"
+      }
+    },
+    {
+      "name": "DivAirportIDs",
+      "type": {
+        "type": "array",
+        "items": "int"
+      }
+    },
+    {
+      "name": "DivAirportSeqIDs",
+      "type": {
+        "type": "array",
+        "items": "int"
+      }
+    },
+    {
+      "name": "DivWheelsOns",
+      "type": {
+        "type": "array",
+        "items": "int"
+      }
+    },
+    {
+      "name": "DivTotalGTimes",
+      "type": {
+        "type": "array",
+        "items": "long"
+      }
+    },
+    {
+      "name": "DivLongestGTimes",
+      "type": {
+        "type": "array",
+        "items": "float"
+      }
+    },
+    {
+      "name": "DivWheelsOffs",
+      "type": {
+        "type": "array",
+        "items": "int"
+      }
+    },
+    {
+      "name": "DivTailNums",
+      "type": {
+        "type": "array",
+        "items": "string"
+      }
+    },
+    {
+      "name": "RandomAirports",
+      "type": {
+        "type": "array",
+        "items": "string"
+      }
+    }
+  ]
+}
diff --git a/pinot-core/src/test/resources/data/fakestream/fake_stream_pinot_schema.json b/pinot-core/src/test/resources/data/fakestream/fake_stream_pinot_schema.json
new file mode 100644
index 0000000..f17cb53
--- /dev/null
+++ b/pinot-core/src/test/resources/data/fakestream/fake_stream_pinot_schema.json
@@ -0,0 +1,335 @@
+{
+  "schemaName": "mytable",
+  "dimensionFieldSpecs": [
+    {
+      "name": "AirlineID",
+      "dataType": "LONG"
+    },
+    {
+      "name": "ArrTime",
+      "dataType": "INT"
+    },
+    {
+      "name": "ArrTimeBlk",
+      "dataType": "STRING"
+    },
+    {
+      "name": "CRSArrTime",
+      "dataType": "INT"
+    },
+    {
+      "name": "CRSDepTime",
+      "dataType": "INT"
+    },
+    {
+      "name": "CRSElapsedTime",
+      "dataType": "INT"
+    },
+    {
+      "name": "CancellationCode",
+      "dataType": "STRING"
+    },
+    {
+      "name": "Carrier",
+      "dataType": "STRING"
+    },
+    {
+      "name": "DayOfWeek",
+      "dataType": "INT"
+    },
+    {
+      "name": "DayofMonth",
+      "dataType": "INT"
+    },
+    {
+      "name": "DepTime",
+      "dataType": "INT"
+    },
+    {
+      "name": "DepTimeBlk",
+      "dataType": "STRING"
+    },
+    {
+      "name": "Dest",
+      "dataType": "STRING"
+    },
+    {
+      "name": "DestAirportID",
+      "dataType": "INT"
+    },
+    {
+      "name": "DestAirportSeqID",
+      "dataType": "INT"
+    },
+    {
+      "name": "DestCityMarketID",
+      "dataType": "INT"
+    },
+    {
+      "name": "DestCityName",
+      "dataType": "STRING"
+    },
+    {
+      "name": "DestState",
+      "dataType": "STRING"
+    },
+    {
+      "name": "DestStateFips",
+      "dataType": "INT"
+    },
+    {
+      "name": "DestStateName",
+      "dataType": "STRING"
+    },
+    {
+      "name": "DestWac",
+      "dataType": "INT"
+    },
+    {
+      "name": "Distance",
+      "dataType": "INT"
+    },
+    {
+      "name": "DistanceGroup",
+      "dataType": "INT"
+    },
+    {
+      "name": "DivActualElapsedTime",
+      "dataType": "INT"
+    },
+    {
+      "name": "DivAirportIDs",
+      "dataType": "INT",
+      "singleValueField": false
+    },
+    {
+      "name": "DivAirportLandings",
+      "dataType": "INT"
+    },
+    {
+      "name": "DivAirportSeqIDs",
+      "dataType": "INT",
+      "singleValueField": false
+    },
+    {
+      "name": "DivAirports",
+      "dataType": "STRING",
+      "singleValueField": false
+    },
+    {
+      "name": "DivArrDelay",
+      "dataType": "INT"
+    },
+    {
+      "name": "DivDistance",
+      "dataType": "INT"
+    },
+    {
+      "name": "DivLongestGTimes",
+      "dataType": "FLOAT",
+      "singleValueField": false
+    },
+    {
+      "name": "DivReachedDest",
+      "dataType": "INT"
+    },
+    {
+      "name": "DivTailNums",
+      "dataType": "STRING",
+      "singleValueField": false
+    },
+    {
+      "name": "DivTotalGTimes",
+      "dataType": "LONG",
+      "singleValueField": false
+    },
+    {
+      "name": "DivWheelsOffs",
+      "dataType": "INT",
+      "singleValueField": false
+    },
+    {
+      "name": "DivWheelsOns",
+      "dataType": "INT",
+      "singleValueField": false
+    },
+    {
+      "name": "Diverted",
+      "dataType": "INT"
+    },
+    {
+      "name": "FirstDepTime",
+      "dataType": "INT"
+    },
+    {
+      "name": "FlightDate",
+      "dataType": "STRING"
+    },
+    {
+      "name": "FlightNum",
+      "dataType": "INT"
+    },
+    {
+      "name": "Flights",
+      "dataType": "INT"
+    },
+    {
+      "name": "LongestAddGTime",
+      "dataType": "INT"
+    },
+    {
+      "name": "Month",
+      "dataType": "INT"
+    },
+    {
+      "name": "Origin",
+      "dataType": "STRING"
+    },
+    {
+      "name": "OriginAirportID",
+      "dataType": "INT"
+    },
+    {
+      "name": "OriginAirportSeqID",
+      "dataType": "INT"
+    },
+    {
+      "name": "OriginCityMarketID",
+      "dataType": "INT"
+    },
+    {
+      "name": "OriginCityName",
+      "dataType": "STRING"
+    },
+    {
+      "name": "OriginState",
+      "dataType": "STRING"
+    },
+    {
+      "name": "OriginStateFips",
+      "dataType": "INT"
+    },
+    {
+      "name": "OriginStateName",
+      "dataType": "STRING"
+    },
+    {
+      "name": "OriginWac",
+      "dataType": "INT"
+    },
+    {
+      "name": "Quarter",
+      "dataType": "INT"
+    },
+    {
+      "name": "RandomAirports",
+      "dataType": "STRING",
+      "singleValueField": false
+    },
+    {
+      "name": "TailNum",
+      "dataType": "STRING"
+    },
+    {
+      "name": "TaxiIn",
+      "dataType": "INT"
+    },
+    {
+      "name": "TaxiOut",
+      "dataType": "INT"
+    },
+    {
+      "name": "Year",
+      "dataType": "INT"
+    },
+    {
+      "name": "WheelsOn",
+      "dataType": "INT"
+    },
+    {
+      "name": "WheelsOff",
+      "dataType": "INT"
+    },
+    {
+      "name": "UniqueCarrier",
+      "dataType": "STRING"
+    },
+    {
+      "name": "TotalAddGTime",
+      "dataType": "INT"
+    }
+  ],
+  "metricFieldSpecs": [
+    {
+      "name": "ActualElapsedTime",
+      "dataType": "INT"
+    },
+    {
+      "name": "AirTime",
+      "dataType": "INT"
+    },
+    {
+      "name": "ArrDel15",
+      "dataType": "INT"
+    },
+    {
+      "name": "ArrDelay",
+      "dataType": "FLOAT"
+    },
+    {
+      "name": "ArrivalDelayGroups",
+      "dataType": "INT"
+    },
+    {
+      "name": "ArrDelayMinutes",
+      "dataType": "DOUBLE"
+    },
+    {
+      "name": "Cancelled",
+      "dataType": "INT"
+    },
+    {
+      "name": "CarrierDelay",
+      "dataType": "INT"
+    },
+    {
+      "name": "DepDel15",
+      "dataType": "INT"
+    },
+    {
+      "name": "DepDelay",
+      "dataType": "DOUBLE"
+    },
+    {
+      "name": "DepDelayMinutes",
+      "dataType": "FLOAT"
+    },
+    {
+      "name": "DepartureDelayGroups",
+      "dataType": "INT"
+    },
+    {
+      "name": "LateAircraftDelay",
+      "dataType": "INT"
+    },
+    {
+      "name": "NASDelay",
+      "dataType": "INT"
+    },
+    {
+      "name": "SecurityDelay",
+      "dataType": "INT"
+    },
+    {
+      "name": "WeatherDelay",
+      "dataType": "INT"
+    }
+  ],
+  "timeFieldSpec": {
+    "incomingGranularitySpec": {
+      "name": "DaysSinceEpoch",
+      "dataType": "INT",
+      "timeType": "DAYS"
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org