You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2019/05/28 17:23:44 UTC
[incubator-pinot] branch master updated: Remove depependency on
Kafka stream implementation classes from pinot-core and pinot-controller
tests (#4233)
This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new fcfcd88 Remove depependency on Kafka stream implementation classes from pinot-core and pinot-controller tests (#4233)
fcfcd88 is described below
commit fcfcd88266c163270169a1ffcfca3d61869c8e59
Author: Neha Pawar <np...@linkedin.com>
AuthorDate: Tue May 28 10:23:37 2019 -0700
Remove depependency on Kafka stream implementation classes from pinot-core and pinot-controller tests (#4233)
* Remove dependency on Kafka stream implementation classes from pinot-core and pinot-controller tests
* Add FakeStream to be used in tests across the board
---
.../resources/PinotTableRestletResourceTest.java | 29 +-
.../controller/api/resources/TableViewsTest.java | 26 +-
.../PinotLLCRealtimeSegmentManagerTest.java | 51 +-
.../segment/FlushThresholdUpdaterTest.java | 23 +-
.../rebalance/DefaultRebalanceStrategyTest.java | 31 +-
.../sharding/SegmentAssignmentStrategyTest.java | 15 +-
.../realtime/LLRealtimeSegmentDataManagerTest.java | 52 +-
.../fakestream/FakePartitionLevelConsumer.java | 121 +++++
.../impl/fakestream/FakeStreamConfigUtils.java | 168 +++++++
.../impl/fakestream/FakeStreamConsumerFactory.java | 101 ++++
.../impl/fakestream/FakeStreamLevelConsumer.java | 49 ++
.../impl/fakestream/FakeStreamMessageBatch.java | 56 +++
.../impl/fakestream/FakeStreamMessageDecoder.java | 71 +++
.../fakestream/FakeStreamMetadataProvider.java | 59 +++
.../core/realtime/stream/StreamConfigTest.java | 277 ++++++-----
.../data/fakestream/fake_stream_avro_data.tar.gz | Bin 0 -> 809609 bytes
.../data/fakestream/fake_stream_avro_schema.avsc | 524 +++++++++++++++++++++
.../data/fakestream/fake_stream_pinot_schema.json | 335 +++++++++++++
18 files changed, 1657 insertions(+), 331 deletions(-)
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotTableRestletResourceTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotTableRestletResourceTest.java
index 5dbe230..fe00c6a 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotTableRestletResourceTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotTableRestletResourceTest.java
@@ -21,8 +21,6 @@ package org.apache.pinot.controller.api.resources;
import com.fasterxml.jackson.databind.JsonNode;
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
import org.apache.pinot.common.config.QuotaConfig;
import org.apache.pinot.common.config.TableConfig;
import org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
@@ -31,9 +29,8 @@ import org.apache.pinot.common.utils.ZkStarter;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.helix.ControllerRequestBuilderUtil;
import org.apache.pinot.controller.helix.ControllerTest;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaStreamConfigProperties;
+import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
import org.apache.pinot.core.realtime.stream.StreamConfig;
-import org.apache.pinot.core.realtime.stream.StreamConfigProperties;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -75,30 +72,10 @@ public class PinotTableRestletResourceTest extends ControllerTest {
// add schema for realtime table
addDummySchema(REALTIME_TABLE_NAME);
-
- Map<String, String> streamConfigs = new HashMap<>();
- String streamType = "kafka";
- streamConfigs.put(StreamConfigProperties.STREAM_TYPE, streamType);
- streamConfigs
- .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
- StreamConfig.ConsumerType.HIGHLEVEL.toString());
- streamConfigs
- .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_TOPIC_NAME),
- "fakeTopic");
- streamConfigs
- .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_DECODER_CLASS),
- "fakeClass");
- streamConfigs.put(StreamConfigProperties
- .constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS), "fakeClass");
- streamConfigs.put(KafkaStreamConfigProperties
- .constructStreamProperty(KafkaStreamConfigProperties.HighLevelConsumer.KAFKA_HLC_ZK_CONNECTION_STRING),
- "fakeUrl");
- streamConfigs.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS, Integer.toString(1234));
- streamConfigs.put(StreamConfigProperties
- .constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_OFFSET_CRITERIA), "smallest");
+ StreamConfig streamConfig = FakeStreamConfigUtils.getDefaultHighLevelStreamConfigs();
_realtimeBuilder.setTableName(REALTIME_TABLE_NAME).setTimeColumnName("timeColumn").setTimeType("DAYS")
.setRetentionTimeUnit("DAYS").setRetentionTimeValue("5").setSchemaName(REALTIME_TABLE_NAME)
- .setStreamConfigs(streamConfigs);
+ .setStreamConfigs(streamConfig.getStreamConfigsMap());
}
@Test
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/TableViewsTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/TableViewsTest.java
index 4ac9418..ee14650 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/TableViewsTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/TableViewsTest.java
@@ -20,7 +20,6 @@ package org.apache.pinot.controller.api.resources;
import java.net.HttpURLConnection;
import java.net.URL;
-import java.util.HashMap;
import java.util.Map;
import org.apache.helix.InstanceType;
import org.apache.pinot.common.config.TableConfig;
@@ -31,10 +30,8 @@ import org.apache.pinot.common.utils.ZkStarter;
import org.apache.pinot.controller.helix.ControllerRequestBuilderUtil;
import org.apache.pinot.controller.helix.ControllerTest;
import org.apache.pinot.controller.utils.SegmentMetadataMockUtils;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaAvroMessageDecoder;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaConsumerFactory;
+import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
import org.apache.pinot.core.realtime.stream.StreamConfig;
-import org.apache.pinot.core.realtime.stream.StreamConfigProperties;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -79,26 +76,9 @@ public class TableViewsTest extends ControllerTest {
// add schema for realtime table
addDummySchema(HYBRID_TABLE_NAME);
- Map<String, String> streamConfigs = new HashMap<>();
- String streamType = "kafka";
- String topic = "aTopic";
- String consumerFactoryClass = KafkaConsumerFactory.class.getName();
- String decoderClass = KafkaAvroMessageDecoder.class.getName();
- streamConfigs.put(StreamConfigProperties.STREAM_TYPE, streamType);
- streamConfigs
- .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_TOPIC_NAME),
- topic);
- streamConfigs
- .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
- StreamConfig.ConsumerType.HIGHLEVEL.toString());
- streamConfigs.put(StreamConfigProperties
- .constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS),
- consumerFactoryClass);
- streamConfigs
- .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_DECODER_CLASS),
- decoderClass);
+ StreamConfig streamConfig = FakeStreamConfigUtils.getDefaultHighLevelStreamConfigs();
tableConfig = new TableConfig.Builder(CommonConstants.Helix.TableType.REALTIME).setTableName(HYBRID_TABLE_NAME)
- .setNumReplicas(2).setStreamConfigs(streamConfigs).build();
+ .setNumReplicas(2).setStreamConfigs(streamConfig.getStreamConfigsMap()).build();
_helixResourceManager.addTable(tableConfig);
// Wait for external view get updated
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index 646d3bc..04e2b7b 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -63,9 +63,7 @@ import org.apache.pinot.controller.helix.core.PinotTableIdealStateBuilder;
import org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
import org.apache.pinot.controller.util.SegmentCompletionUtils;
import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaAvroMessageDecoder;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaConsumerFactory;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaStreamConfigProperties;
+import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
import org.apache.pinot.core.realtime.stream.OffsetCriteria;
import org.apache.pinot.core.realtime.stream.StreamConfig;
import org.apache.pinot.core.realtime.stream.StreamConfigProperties;
@@ -1268,32 +1266,10 @@ public class PinotLLCRealtimeSegmentManagerTest {
when(mockValidationConfig.getReplicasPerPartition()).thenReturn(Integer.toString(nReplicas));
when(mockValidationConfig.getReplicasPerPartitionNumber()).thenReturn(nReplicas);
when(mockTableConfig.getValidationConfig()).thenReturn(mockValidationConfig);
- Map<String, String> streamConfigMap = new HashMap<>(1);
- String streamType = "kafka";
- streamConfigMap.put(StreamConfigProperties.STREAM_TYPE, streamType);
- String topic = "aTopic";
- String consumerFactoryClass = KafkaConsumerFactory.class.getName();
- String decoderClass = KafkaAvroMessageDecoder.class.getName();
- streamConfigMap
- .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_TOPIC_NAME),
- topic);
- streamConfigMap.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS, "100000");
- streamConfigMap
- .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
- "simple");
- streamConfigMap.put(StreamConfigProperties
- .constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS),
- consumerFactoryClass);
- streamConfigMap
- .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_DECODER_CLASS),
- decoderClass);
-
- final String bootstrapHostConfigKey = KafkaStreamConfigProperties
- .constructStreamProperty(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_BROKER_LIST);
- streamConfigMap.put(bootstrapHostConfigKey, bootstrapHosts);
+ StreamConfig streamConfig = FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs();
IndexingConfig mockIndexConfig = mock(IndexingConfig.class);
- when(mockIndexConfig.getStreamConfigs()).thenReturn(streamConfigMap);
+ when(mockIndexConfig.getStreamConfigs()).thenReturn(streamConfig.getStreamConfigsMap());
when(mockTableConfig.getIndexingConfig()).thenReturn(mockIndexConfig);
TenantConfig mockTenantConfig = mock(TenantConfig.class);
@@ -1303,24 +1279,6 @@ public class PinotLLCRealtimeSegmentManagerTest {
return mockTableConfig;
}
- private static Map<String, String> getStreamConfigs() {
- Map<String, String> streamPropMap = new HashMap<>(1);
- String streamType = "kafka";
- streamPropMap.put(StreamConfigProperties.STREAM_TYPE, streamType);
- String topic = "aTopic";
- streamPropMap
- .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_TOPIC_NAME),
- topic);
- streamPropMap
- .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
- "simple");
- streamPropMap.put(StreamConfigProperties
- .constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_OFFSET_CRITERIA), "smallest");
- streamPropMap.put(KafkaStreamConfigProperties
- .constructStreamProperty(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_BROKER_LIST), "host:1234");
- return streamPropMap;
- }
-
//////////////////////////////////////////////////////////////////////////////////
// Fake classes
/////////////////////////////////////////////////////////////////////////////////
@@ -1372,7 +1330,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
TableConfig mockTableConfig = mock(TableConfig.class);
IndexingConfig mockIndexingConfig = mock(IndexingConfig.class);
when(mockTableConfig.getIndexingConfig()).thenReturn(mockIndexingConfig);
- when(mockIndexingConfig.getStreamConfigs()).thenReturn(getStreamConfigs());
+ StreamConfig streamConfig = FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs();
+ when(mockIndexingConfig.getStreamConfigs()).thenReturn(streamConfig.getStreamConfigsMap());
when(mockCache.getTableConfig(anyString())).thenReturn(mockTableConfig);
Field tableConfigCacheField = PinotLLCRealtimeSegmentManager.class.getDeclaredField("_tableConfigCache");
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java
index e6037aa..aefaf3e 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java
@@ -28,8 +28,7 @@ import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
import org.apache.pinot.common.partition.PartitionAssignment;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.LLCSegmentName;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaAvroMessageDecoder;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaConsumerFactory;
+import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
import org.apache.pinot.core.realtime.stream.StreamConfig;
import org.apache.pinot.core.realtime.stream.StreamConfigProperties;
import org.testng.Assert;
@@ -271,24 +270,7 @@ public class FlushThresholdUpdaterTest {
TableConfig realtimeTableConfig;
FlushThresholdUpdater flushThresholdUpdater;
- Map<String, String> streamConfigs = new HashMap<>();
- String streamType = "kafka";
- String streamTopic = "aTopic";
- String consumerFactoryClass = KafkaConsumerFactory.class.getName();
- String decoderClass = KafkaAvroMessageDecoder.class.getName();
- streamConfigs.put(StreamConfigProperties.STREAM_TYPE, streamType);
- streamConfigs
- .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_TOPIC_NAME),
- streamTopic);
- streamConfigs
- .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
- StreamConfig.ConsumerType.LOWLEVEL.toString());
- streamConfigs.put(StreamConfigProperties
- .constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS),
- consumerFactoryClass);
- streamConfigs
- .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_DECODER_CLASS),
- decoderClass);
+ Map<String, String> streamConfigs = FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap();
tableConfigBuilder.setStreamConfigs(streamConfigs);
// flush size set
@@ -500,7 +482,6 @@ public class FlushThresholdUpdaterTest {
String tableName = "fakeTable_REALTIME";
int seqNum = 0;
long startOffset = 0;
- long committingSegmentSizeBytes;
CommittingSegmentDescriptor committingSegmentDescriptor;
long now = System.currentTimeMillis();
long seg0time = now - 1334_650;
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalanceStrategyTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalanceStrategyTest.java
index 2dc07c0..0f6b5f1 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalanceStrategyTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalanceStrategyTest.java
@@ -41,8 +41,8 @@ import org.apache.pinot.common.partition.PartitionAssignment;
import org.apache.pinot.common.partition.StreamPartitionAssignmentGenerator;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.controller.helix.core.PinotHelixSegmentOnlineOfflineStateModelGenerator;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaAvroMessageDecoder;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaConsumerFactory;
+import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
+import org.apache.pinot.core.realtime.stream.StreamConfig;
import org.apache.pinot.core.realtime.stream.StreamConfigProperties;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
@@ -581,26 +581,15 @@ public class DefaultRebalanceStrategyTest {
CommonConstants.Helix.TableType tableTypeFromTableName = TableNameBuilder.getTableTypeFromTableName(tableName);
when(mockTableConfig.getTableType()).thenReturn(tableTypeFromTableName);
- Map<String, String> streamConfigMap = new HashMap<>(1);
- String streamType = "kafka";
- String topic = "aTopic";
- String consumerFactoryClass = KafkaConsumerFactory.class.getName();
- String decoderClass = KafkaAvroMessageDecoder.class.getName();
- streamConfigMap.put(StreamConfigProperties.STREAM_TYPE, streamType);
- streamConfigMap
- .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_TOPIC_NAME),
- topic);
- streamConfigMap
- .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
- consumerTypesCSV);
- streamConfigMap.put(StreamConfigProperties
- .constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS),
- consumerFactoryClass);
- streamConfigMap
- .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_DECODER_CLASS),
- decoderClass);
+ StreamConfig streamConfig;
+ if (StreamConfig.ConsumerType.HIGHLEVEL.toString().equalsIgnoreCase(consumerTypesCSV)) {
+ streamConfig = FakeStreamConfigUtils.getDefaultHighLevelStreamConfigs();
+ } else {
+ streamConfig = FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs();
+ }
+
IndexingConfig mockIndexConfig = mock(IndexingConfig.class);
- when(mockIndexConfig.getStreamConfigs()).thenReturn(streamConfigMap);
+ when(mockIndexConfig.getStreamConfigs()).thenReturn(streamConfig.getStreamConfigsMap());
when(mockTableConfig.getIndexingConfig()).thenReturn(mockIndexConfig);
return mockTableConfig;
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/sharding/SegmentAssignmentStrategyTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/sharding/SegmentAssignmentStrategyTest.java
index 009116f..3c96a20 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/sharding/SegmentAssignmentStrategyTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/sharding/SegmentAssignmentStrategyTest.java
@@ -40,8 +40,7 @@ import org.apache.pinot.controller.helix.ControllerRequestBuilderUtil;
import org.apache.pinot.controller.helix.ControllerTest;
import org.apache.pinot.controller.utils.ReplicaGroupTestUtils;
import org.apache.pinot.controller.utils.SegmentMetadataMockUtils;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaAvroMessageDecoder;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaStreamConfigProperties;
+import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
import org.apache.pinot.core.realtime.stream.StreamConfig;
import org.apache.pinot.core.realtime.stream.StreamConfigProperties;
import org.testng.Assert;
@@ -253,17 +252,7 @@ public class SegmentAssignmentStrategyTest extends ControllerTest {
Schema schema = new Schema.SchemaBuilder().setSchemaName(rawTableName).build();
_helixResourceManager.addOrUpdateSchema(schema);
- Map<String, String> streamConfigMap = new HashMap<>();
- String type = "kafka";
- streamConfigMap.put(StreamConfigProperties.STREAM_TYPE, type);
- streamConfigMap.put(StreamConfigProperties.constructStreamProperty(type, StreamConfigProperties.STREAM_TOPIC_NAME),
- "test");
- streamConfigMap.put(
- StreamConfigProperties.constructStreamProperty(type, StreamConfigProperties.STREAM_CONSUMER_TYPES),
- StreamConfig.ConsumerType.LOWLEVEL.toString());
- streamConfigMap.put(
- StreamConfigProperties.constructStreamProperty(type, StreamConfigProperties.STREAM_DECODER_CLASS),
- KafkaAvroMessageDecoder.class.getName());
+ Map<String, String> streamConfigMap = FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap();
// Adding a table without replica group
TableConfig tableConfig = new TableConfig.Builder(CommonConstants.Helix.TableType.REALTIME).setTableName(
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
index ee2ce4a..3d7bfce 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
@@ -25,9 +25,7 @@ import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.LinkedList;
-import java.util.Map;
import org.apache.commons.io.FileUtils;
-import org.apache.kafka.common.protocol.Errors;
import org.apache.pinot.common.config.TableConfig;
import org.apache.pinot.common.data.Schema;
import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
@@ -36,13 +34,13 @@ import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
import org.apache.pinot.common.utils.LLCSegmentName;
-import org.apache.pinot.core.data.GenericRow;
import org.apache.pinot.core.data.manager.config.InstanceDataManagerConfig;
import org.apache.pinot.core.indexsegment.mutable.MutableSegmentImpl;
import org.apache.pinot.core.realtime.impl.RealtimeSegmentStatsHistory;
+import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConsumerFactory;
+import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamMessageDecoder;
import org.apache.pinot.core.realtime.stream.PermanentConsumerException;
import org.apache.pinot.core.realtime.stream.StreamConfigProperties;
-import org.apache.pinot.core.realtime.stream.StreamMessageDecoder;
import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
@@ -50,8 +48,7 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
// TODO Write more tests for other parts of the class
@@ -85,17 +82,17 @@ public class LLRealtimeSegmentDataManagerTest {
+ " \"segmentFormatVersion\": null, \n" + " \"sortedColumn\": [], \n" + " \"streamConfigs\": {\n"
+ " \"" + StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS + "\": \"" + String
.valueOf(maxRowsInSegment) + "\", \n" + " \"" + StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_TIME
- + "\": \"" + maxTimeForSegmentCloseMs + "\", \n" + " \"stream.kafka.broker.list\": \"broker:7777\", \n"
- + " \"stream.kafka.consumer.prop.auto.offset.reset\": \"smallest\", \n"
- + " \"stream.kafka.consumer.type\": \"simple\", \n"
- + " \"stream.kafka.consumer.factory.class.name\": \"org.apache.pinot.core.realtime.impl.kafka.KafkaConsumerFactory\", \n"
- + " \"stream.kafka.decoder.class.name\": \"" + FakeStreamMessageDecoder.class.getName() + "\", \n"
- + " \"stream.kafka.decoder.prop.schema.registry.rest.url\": \"http://schema-registry-host.corp.ceo:1766/schemas\", \n"
- + " \"stream.kafka.decoder.prop.schema.registry.schema.name\": \"UnknownSchema\", \n"
- + " \"stream.kafka.hlc.zk.connect.string\": \"zoo:2181/kafka-queuing\", \n"
- + " \"stream.kafka.topic.name\": \"" + _topicName + "\", \n"
- + " \"stream.kafka.zk.broker.url\": \"kafka-broker:2181/kafka-queuing\", \n"
- + " \"streamType\": \"kafka\"\n" + " }\n" + " }, \n" + " \"tableName\": \"Coffee_REALTIME\", \n"
+ + "\": \"" + maxTimeForSegmentCloseMs + "\", \n" + " \"stream.fakeStream.broker.list\": \"broker:7777\", \n"
+ + " \"stream.fakeStream.consumer.prop.auto.offset.reset\": \"smallest\", \n"
+ + " \"stream.fakeStream.consumer.type\": \"simple\", \n"
+ + " \"stream.fakeStream.consumer.factory.class.name\": \"" + FakeStreamConsumerFactory.class.getName()+ "\", \n"
+ + " \"stream.fakeStream.decoder.class.name\": \"" + FakeStreamMessageDecoder.class.getName() + "\", \n"
+ + " \"stream.fakeStream.decoder.prop.schema.registry.rest.url\": \"http://schema-registry-host.corp.ceo:1766/schemas\", \n"
+ + " \"stream.fakeStream.decoder.prop.schema.registry.schema.name\": \"UnknownSchema\", \n"
+ + " \"stream.fakeStream.hlc.zk.connect.string\": \"zoo:2181/kafka-queuing\", \n"
+ + " \"stream.fakeStream.topic.name\": \"" + _topicName + "\", \n"
+ + " \"stream.fakeStream.zk.broker.url\": \"kafka-broker:2181/kafka-queuing\", \n"
+ + " \"streamType\": \"fakeStream\"\n" + " }\n" + " }, \n" + " \"tableName\": \"Coffee_REALTIME\", \n"
+ " \"tableType\": \"realtime\", \n" + " \"tenants\": {\n" + " \"broker\": \"shared\", \n"
+ " \"server\": \"server-1\"\n" + " }\n" + "}";
@@ -133,25 +130,6 @@ public class LLRealtimeSegmentDataManagerTest {
return segmentZKMetadata;
}
- public static class FakeStreamMessageDecoder implements StreamMessageDecoder<byte[]> {
-
- @Override
- public void init(Map<String, String> props, Schema indexingSchema, String topicName)
- throws Exception {
-
- }
-
- @Override
- public GenericRow decode(byte[] payload, GenericRow destination) {
- return null;
- }
-
- @Override
- public GenericRow decode(byte[] payload, int offset, int length, GenericRow destination) {
- return null;
- }
- }
-
private FakeLLRealtimeSegmentDataManager createFakeSegmentManager()
throws Exception {
LLCRealtimeSegmentZKMetadata segmentZKMetadata = createZkMetadata();
@@ -717,7 +695,7 @@ public class LLRealtimeSegmentDataManagerTest {
protected boolean consumeLoop()
throws Exception {
if (_throwExceptionFromConsume) {
- throw new PermanentConsumerException(Errors.OFFSET_OUT_OF_RANGE.exception());
+ throw new PermanentConsumerException(new Throwable("Offset out of range"));
}
setCurrentOffset(_consumeOffsets.remove());
terminateLoopIfNecessary();
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakePartitionLevelConsumer.java b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakePartitionLevelConsumer.java
new file mode 100644
index 0000000..3695022
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakePartitionLevelConsumer.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.fakestream;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.core.realtime.stream.MessageBatch;
+import org.apache.pinot.core.realtime.stream.PartitionLevelConsumer;
+import org.apache.pinot.core.realtime.stream.StreamConfig;
+import org.apache.pinot.core.util.AvroUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils.*;
+
+
+/**
+ * Implementation of {@link PartitionLevelConsumer} for fake stream
+ * Unpacks tar files in /resources/data/On_Time_Performance_2014_partition_<partition>.tar.gz as source of messages
+ */
+public class FakePartitionLevelConsumer implements PartitionLevelConsumer {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(FakePartitionLevelConsumer.class.getName());
+
+ private List<Integer> messageOffsets = new ArrayList<>();
+ private List<byte[]> messageBytes = new ArrayList<>();
+
+ FakePartitionLevelConsumer(int partition, StreamConfig streamConfig) {
+
+ // TODO: this logic can move to a FakeStreamProducer instead of being inside the Consumer
+ File tempDir = new File(FileUtils.getTempDirectory(), getClass().getSimpleName());
+ File outputDir = new File(tempDir, String.valueOf(partition));
+
+ int offset = 0;
+
+ try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream(65536)) {
+ File avroFile = unpackAvroTarFile(outputDir).get(0);
+
+ int numPartitions = FakeStreamConfigUtils.getNumPartitions(streamConfig);
+
+ try (DataFileStream<GenericRecord> reader = AvroUtils.getAvroReader(avroFile)) {
+ BinaryEncoder binaryEncoder = new EncoderFactory().directBinaryEncoder(outputStream, null);
+ GenericDatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(reader.getSchema());
+
+ int recordNumber = 0;
+ for (GenericRecord genericRecord : reader) {
+ if (getPartitionNumber(recordNumber++, numPartitions) != partition) {
+ continue;
+ }
+ outputStream.reset();
+
+ datumWriter.write(genericRecord, binaryEncoder);
+ binaryEncoder.flush();
+
+ byte[] bytes = outputStream.toByteArray();
+ // contiguous offsets
+ messageOffsets.add(offset++);
+ messageBytes.add(bytes);
+ }
+ }
+ } catch (Exception e) {
+ LOGGER.error("Could not create {}", FakePartitionLevelConsumer.class.getName(), e);
+ } finally {
+ FileUtils.deleteQuietly(outputDir);
+ }
+ }
+
+ @Override
+ public MessageBatch fetchMessages(long startOffset, long endOffset, int timeoutMillis) throws TimeoutException {
+ if (startOffset >= FakeStreamConfigUtils.getLargestOffset()) {
+ return new FakeStreamMessageBatch(Collections.emptyList(), Collections.emptyList());
+ }
+ if (startOffset < FakeStreamConfigUtils.getSmallestOffset()) {
+ startOffset = FakeStreamConfigUtils.getSmallestOffset();
+ }
+ if (endOffset > FakeStreamConfigUtils.getLargestOffset()) {
+ endOffset = FakeStreamConfigUtils.getLargestOffset();
+ }
+ return new FakeStreamMessageBatch(messageOffsets.subList((int) startOffset, (int) endOffset),
+ messageBytes.subList((int) startOffset, (int) endOffset));
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ /**
+ * Partitions the raw data
+ * This can be abstracted out and injected via stream configs to incorporate custom partitioning logic
+ */
+ public int getPartitionNumber(int recordNumber, int numPartitions) {
+ return recordNumber % numPartitions;
+ }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConfigUtils.java b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConfigUtils.java
new file mode 100644
index 0000000..5505b02
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConfigUtils.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.fakestream;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.data.Schema;
+import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.core.realtime.stream.StreamConfig;
+import org.apache.pinot.core.realtime.stream.StreamConfigProperties;
+import org.apache.pinot.util.TestUtils;
+import org.testng.Assert;
+
+
+/**
+ * Helper methods to provide fake stream details
+ * TODO: make input tar file and pinot schema configurable
+ */
+public class FakeStreamConfigUtils {
+ private static final String AVRO_TAR_FILE = "fake_stream_avro_data.tar.gz";
+ // This avro schema file must be in sync with the avro data
+ private static final String AVRO_SCHEMA_FILE = "fake_stream_avro_schema.avsc";
+ private static final String PINOT_SCHEMA_FILE = "fake_stream_pinot_schema.json";
+
+ private static final int SMALLEST_OFFSET = 0;
+ private static final int LARGEST_OFFSET = Integer.MAX_VALUE;
+ private static final String NUM_PARTITIONS_KEY = "num.partitions";
+ private static final int DEFAULT_NUM_PARTITIONS = 2;
+
+ private static final String STREAM_TYPE = "fakeStream";
+ private static final String TOPIC_NAME = "fakeTopic";
+ private static final String CONSUMER_FACTORY_CLASS = FakeStreamConsumerFactory.class.getName();
+ private static final String OFFSET_CRITERIA = "smallest";
+ private static final String DECODER_CLASS = FakeStreamMessageDecoder.class.getName();
+ private static final int SEGMENT_FLUSH_THRESHOLD_ROWS = 500;
+
+ /**
+ * Gets default num partitions
+ */
+ static int getNumPartitions(StreamConfig streamConfig) {
+ Map<String, String> streamConfigsMap = streamConfig.getStreamConfigsMap();
+ String numPartitionsKey =
+ StreamConfigProperties.constructStreamProperty(streamConfig.getType(), NUM_PARTITIONS_KEY);
+ if (streamConfigsMap.containsKey(numPartitionsKey)) {
+ return Integer.parseInt(streamConfigsMap.get(numPartitionsKey));
+ }
+ return DEFAULT_NUM_PARTITIONS;
+ }
+
+ /**
+ * Gets smallest offset based on data
+ */
+ static int getSmallestOffset() {
+ return SMALLEST_OFFSET;
+ }
+
+ /**
+ * Gets largest offset based on data
+ */
+ static int getLargestOffset() {
+ return LARGEST_OFFSET;
+ }
+
+ /**
+ * Unpacks avro tar file
+ */
+ static List<File> unpackAvroTarFile(File outputDir) throws Exception {
+ if (outputDir.exists()) {
+ FileUtils.deleteDirectory(outputDir);
+ }
+ File avroTarFile = getResourceFile(AVRO_TAR_FILE);
+ return TarGzCompressionUtils.unTar(avroTarFile, outputDir);
+ }
+
+ /**
+ * Gets avro schema
+ */
+ static File getAvroSchemaFile() {
+ return getResourceFile(AVRO_SCHEMA_FILE);
+ }
+
+ /**
+ * Gets pinot schema
+ */
+ static Schema getPinotSchema() throws IOException {
+ File schemaFile = getResourceFile(PINOT_SCHEMA_FILE);
+ return Schema.fromFile(schemaFile);
+ }
+
+ private static File getResourceFile(String fileName) {
+ URL resourceURL = FakeStreamConfigUtils.class.getClassLoader().getResource("data/fakestream");
+ Assert.assertNotNull(resourceURL);
+ return new File(TestUtils.getFileFromResourceUrl(resourceURL), fileName);
+ }
+
+ /**
+ * Generate fake stream configs for low level stream with custom number of partitions
+ */
+ public static StreamConfig getDefaultLowLevelStreamConfigs(int numPartitions) {
+ Map<String, String> streamConfigMap = getDefaultStreamConfigs();
+ streamConfigMap.put(
+ StreamConfigProperties.constructStreamProperty(STREAM_TYPE, StreamConfigProperties.STREAM_CONSUMER_TYPES),
+ StreamConfig.ConsumerType.LOWLEVEL.toString());
+ streamConfigMap.put(
+ StreamConfigProperties.constructStreamProperty(STREAM_TYPE, NUM_PARTITIONS_KEY),
+ String.valueOf(numPartitions));
+
+ return new StreamConfig(streamConfigMap);
+ }
+
+ /**
+ * Generate fake stream configs for low level stream
+ */
+ public static StreamConfig getDefaultLowLevelStreamConfigs() {
+ return getDefaultLowLevelStreamConfigs(DEFAULT_NUM_PARTITIONS);
+ }
+
+ /**
+ * Generate fake stream configs for high level stream
+ */
+ public static StreamConfig getDefaultHighLevelStreamConfigs() {
+ Map<String, String> streamConfigMap = getDefaultStreamConfigs();
+ streamConfigMap.put(
+ StreamConfigProperties.constructStreamProperty(STREAM_TYPE, StreamConfigProperties.STREAM_CONSUMER_TYPES),
+ StreamConfig.ConsumerType.HIGHLEVEL.toString());
+
+ return new StreamConfig(streamConfigMap);
+ }
+
+ private static Map<String, String> getDefaultStreamConfigs() {
+ Map<String, String> streamConfigMap = new HashMap<>();
+ streamConfigMap.put(StreamConfigProperties.STREAM_TYPE, STREAM_TYPE);
+ streamConfigMap.put(
+ StreamConfigProperties.constructStreamProperty(STREAM_TYPE, StreamConfigProperties.STREAM_TOPIC_NAME),
+ TOPIC_NAME);
+ streamConfigMap.put(StreamConfigProperties.constructStreamProperty(STREAM_TYPE,
+ StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS), CONSUMER_FACTORY_CLASS);
+ streamConfigMap.put(StreamConfigProperties.constructStreamProperty(STREAM_TYPE,
+ StreamConfigProperties.STREAM_CONSUMER_OFFSET_CRITERIA), OFFSET_CRITERIA);
+ streamConfigMap.put(
+ StreamConfigProperties.constructStreamProperty(STREAM_TYPE, StreamConfigProperties.STREAM_DECODER_CLASS),
+ DECODER_CLASS);
+ streamConfigMap.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS,
+ Integer.toString(SEGMENT_FLUSH_THRESHOLD_ROWS));
+ return streamConfigMap;
+ }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
new file mode 100644
index 0000000..92ab3e3
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.fakestream;
+
+import org.apache.pinot.common.data.Schema;
+import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.core.data.GenericRow;
+import org.apache.pinot.core.realtime.stream.MessageBatch;
+import org.apache.pinot.core.realtime.stream.OffsetCriteria;
+import org.apache.pinot.core.realtime.stream.PartitionLevelConsumer;
+import org.apache.pinot.core.realtime.stream.StreamConfig;
+import org.apache.pinot.core.realtime.stream.StreamConsumerFactory;
+import org.apache.pinot.core.realtime.stream.StreamConsumerFactoryProvider;
+import org.apache.pinot.core.realtime.stream.StreamDecoderProvider;
+import org.apache.pinot.core.realtime.stream.StreamLevelConsumer;
+import org.apache.pinot.core.realtime.stream.StreamMessageDecoder;
+import org.apache.pinot.core.realtime.stream.StreamMetadataProvider;
+
+
+/**
+ * Implementation of {@link StreamConsumerFactory} for a fake stream
+ * Data source is /resources/data/fakestream_avro_data.tar.gz
+ * Avro schema is /resources/data/fakestream/fake_stream_avro_schema.avsc
+ * Pinot schema is /resources/data/fakestream/fake_stream_pinot_schema.avsc
+ */
+public class FakeStreamConsumerFactory extends StreamConsumerFactory {
+
+ @Override
+ public PartitionLevelConsumer createPartitionLevelConsumer(String clientId, int partition) {
+ return new FakePartitionLevelConsumer(partition, _streamConfig);
+ }
+
+ @Override
+ public StreamLevelConsumer createStreamLevelConsumer(String clientId, String tableName, Schema schema,
+ InstanceZKMetadata instanceZKMetadata, ServerMetrics serverMetrics) {
+ return new FakeStreamLevelConsumer();
+ }
+
+ @Override
+ public StreamMetadataProvider createPartitionMetadataProvider(String clientId, int partition) {
+ return new FakeStreamMetadataProvider(_streamConfig);
+ }
+
+ @Override
+ public StreamMetadataProvider createStreamMetadataProvider(String clientId) {
+ return new FakeStreamMetadataProvider(_streamConfig);
+ }
+
+ public static void main(String[] args) throws Exception {
+ String clientId = "client_id_localhost_tester";
+
+ // stream config
+ int numPartitions = 5;
+ StreamConfig streamConfig = FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs(numPartitions);
+
+ // stream consumer factory
+ StreamConsumerFactory streamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig);
+
+ // stream metadata provider
+ StreamMetadataProvider streamMetadataProvider = streamConsumerFactory.createStreamMetadataProvider(clientId);
+ int partitionCount = streamMetadataProvider.fetchPartitionCount(10_000);
+ System.out.println(partitionCount);
+
+ // Partition metadata provider
+ int partition = 3;
+ StreamMetadataProvider partitionMetadataProvider =
+ streamConsumerFactory.createPartitionMetadataProvider(clientId, partition);
+ long partitionOffset =
+ partitionMetadataProvider.fetchPartitionOffset(OffsetCriteria.SMALLEST_OFFSET_CRITERIA, 10_000);
+ System.out.println(partitionOffset);
+
+ // Partition level consumer
+ PartitionLevelConsumer partitionLevelConsumer =
+ streamConsumerFactory.createPartitionLevelConsumer(clientId, partition);
+ MessageBatch messageBatch = partitionLevelConsumer.fetchMessages(10, 40, 10_000);
+
+ // Message decoder
+ Schema pinotSchema = FakeStreamConfigUtils.getPinotSchema();
+ StreamMessageDecoder streamMessageDecoder = StreamDecoderProvider.create(streamConfig, pinotSchema);
+ GenericRow decodedRow = new GenericRow();
+ streamMessageDecoder.decode(messageBatch.getMessageAtIndex(0), decodedRow);
+ System.out.println(decodedRow);
+ }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamLevelConsumer.java b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamLevelConsumer.java
new file mode 100644
index 0000000..bf16371
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamLevelConsumer.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.fakestream;
+
+import org.apache.pinot.core.data.GenericRow;
+import org.apache.pinot.core.realtime.stream.StreamLevelConsumer;
+
+
+/**
+ * Test implementation of {@link StreamLevelConsumer}
+ * This is currently a no-op
+ */
+public class FakeStreamLevelConsumer implements StreamLevelConsumer {
+ @Override
+ public void start() throws Exception {
+
+ }
+
+ @Override
+ public GenericRow next(GenericRow destination) {
+ return destination;
+ }
+
+ @Override
+ public void commit() {
+
+ }
+
+ @Override
+ public void shutdown() throws Exception {
+
+ }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMessageBatch.java b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMessageBatch.java
new file mode 100644
index 0000000..4afd6da
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMessageBatch.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.fakestream;
+
+import java.util.List;
+import org.apache.pinot.core.realtime.stream.MessageBatch;
+
+
+/**
+ * MessageBatch implementation for the fake stream
+ */
+public class FakeStreamMessageBatch implements MessageBatch<byte[]> {
+ private List<Integer> _messageOffsets;
+ private List<byte[]> _messageBytes;
+
+ FakeStreamMessageBatch(List<Integer> messageOffsets, List<byte[]> messageBytes) {
+ _messageOffsets = messageOffsets;
+ _messageBytes = messageBytes;
+ }
+
+ public int getMessageCount() {
+ return _messageOffsets.size();
+ }
+
+ public byte[] getMessageAtIndex(int index) {
+ return _messageBytes.get(index);
+ }
+
+ public int getMessageOffsetAtIndex(int index) {
+ return _messageOffsets.get(index);
+ }
+
+ public int getMessageLengthAtIndex(int index) {
+ return _messageBytes.get(index).length;
+ }
+
+ public long getNextStreamMessageOffsetAtIndex(int index) {
+ return _messageOffsets.get(index) + 1;
+ }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMessageDecoder.java b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMessageDecoder.java
new file mode 100644
index 0000000..946fbbc
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMessageDecoder.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.fakestream;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.pinot.common.data.Schema;
+import org.apache.pinot.core.data.GenericRow;
+import org.apache.pinot.core.realtime.impl.kafka.AvroRecordToPinotRowGenerator;
+import org.apache.pinot.core.realtime.stream.StreamMessageDecoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * StreamMessageDecoder implementation for fake stream
+ */
+public class FakeStreamMessageDecoder implements StreamMessageDecoder<byte[]> {
+ private static final Logger LOGGER = LoggerFactory.getLogger(FakeStreamMessageDecoder.class);
+
+ private org.apache.avro.Schema _avroSchema;
+ private DatumReader<GenericData.Record> _datumReader;
+ private AvroRecordToPinotRowGenerator _avroRecordConverter;
+ private BinaryDecoder _binaryDecoderToReuse;
+ private GenericData.Record _avroRecordToReuse;
+
+ @Override
+ public void init(Map props, Schema indexingSchema, String topicName) throws Exception {
+ _avroSchema = new org.apache.avro.Schema.Parser().parse(FakeStreamConfigUtils.getAvroSchemaFile());
+ _datumReader = new GenericDatumReader<>(_avroSchema);
+ _avroRecordConverter = new AvroRecordToPinotRowGenerator(indexingSchema);
+ }
+
+ @Override
+ public GenericRow decode(byte[] payload, GenericRow destination) {
+ return decode(payload, 0, payload.length, destination);
+ }
+
+ @Override
+ public GenericRow decode(byte[] payload, int offset, int length, GenericRow destination) {
+ _binaryDecoderToReuse = DecoderFactory.get().binaryDecoder(payload, offset, length, _binaryDecoderToReuse);
+ try {
+ _avroRecordToReuse = _datumReader.read(_avroRecordToReuse, _binaryDecoderToReuse);
+ } catch (IOException e) {
+ LOGGER.error("Caught exception while reading message using schema: {}", _avroSchema, e);
+ return null;
+ }
+ return _avroRecordConverter.transform(_avroRecordToReuse, destination);
+ }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMetadataProvider.java b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMetadataProvider.java
new file mode 100644
index 0000000..a75660d
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMetadataProvider.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.fakestream;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.TimeoutException;
+import javax.annotation.Nonnull;
+import org.apache.pinot.core.realtime.stream.OffsetCriteria;
+import org.apache.pinot.core.realtime.stream.StreamConfig;
+import org.apache.pinot.core.realtime.stream.StreamConfigProperties;
+import org.apache.pinot.core.realtime.stream.StreamMetadataProvider;
+
+
+/**
+ * StreamMetadataProvider implementation for the fake stream
+ */
+public class FakeStreamMetadataProvider implements StreamMetadataProvider {
+ private int _numPartitions;
+
+ public FakeStreamMetadataProvider(StreamConfig streamConfig) {
+ _numPartitions = FakeStreamConfigUtils.getNumPartitions(streamConfig);
+ }
+
+ @Override
+ public int fetchPartitionCount(long timeoutMillis) {
+ return _numPartitions;
+ }
+
+ @Override
+ public long fetchPartitionOffset(@Nonnull OffsetCriteria offsetCriteria, long timeoutMillis) throws TimeoutException {
+ if (offsetCriteria.isSmallest()) {
+ return FakeStreamConfigUtils.getSmallestOffset();
+ } else {
+ return FakeStreamConfigUtils.getLargestOffset();
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/realtime/stream/StreamConfigTest.java b/pinot-core/src/test/java/org/apache/pinot/core/realtime/stream/StreamConfigTest.java
index 3345487..45acc92 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/realtime/stream/StreamConfigTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/realtime/stream/StreamConfigTest.java
@@ -22,8 +22,9 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.pinot.common.utils.DataSize;
import org.apache.pinot.common.utils.time.TimeUtils;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaAvroMessageDecoder;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaConsumerFactory;
+import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
+import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConsumerFactory;
+import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamMessageDecoder;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -37,12 +38,13 @@ public class StreamConfigTest {
public void testStreamConfig() {
boolean exception = false;
StreamConfig streamConfig;
- String streamType = "kafka";
- String topic = "aTopic";
+ String streamType = "fakeStream";
+ String topic = "fakeTopic";
String consumerType = StreamConfig.ConsumerType.LOWLEVEL.toString();
- String consumerFactoryClassName = KafkaConsumerFactory.class.getName();
- String decoderClass = KafkaAvroMessageDecoder.class.getName();
+ String consumerFactoryClass = FakeStreamConsumerFactory.class.getName();
+ String decoderClass = FakeStreamMessageDecoder.class.getName();
+ // test with empty map
try {
Map<String, String> streamConfigMap = new HashMap<>();
streamConfig = new StreamConfig(streamConfigMap);
@@ -54,18 +56,16 @@ public class StreamConfigTest {
// All mandatory properties set
Map<String, String> streamConfigMap = new HashMap<>();
streamConfigMap.put(StreamConfigProperties.STREAM_TYPE, streamType);
- streamConfigMap
- .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_TOPIC_NAME),
- topic);
- streamConfigMap
- .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
- consumerType);
- streamConfigMap.put(StreamConfigProperties
- .constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS),
- consumerFactoryClassName);
- streamConfigMap
- .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_DECODER_CLASS),
- decoderClass);
+ streamConfigMap.put(
+ StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_TOPIC_NAME), topic);
+ streamConfigMap.put(
+ StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
+ consumerType);
+ streamConfigMap.put(StreamConfigProperties.constructStreamProperty(streamType,
+ StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS), consumerFactoryClass);
+ streamConfigMap.put(
+ StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_DECODER_CLASS),
+ decoderClass);
streamConfig = new StreamConfig(streamConfigMap);
// Missing streamType
@@ -80,8 +80,8 @@ public class StreamConfigTest {
// Missing stream topic
streamConfigMap.put(StreamConfigProperties.STREAM_TYPE, streamType);
- streamConfigMap
- .remove(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_TOPIC_NAME));
+ streamConfigMap.remove(
+ StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_TOPIC_NAME));
exception = false;
try {
streamConfig = new StreamConfig(streamConfigMap);
@@ -91,9 +91,8 @@ public class StreamConfigTest {
Assert.assertTrue(exception);
// Missing consumer type
- streamConfigMap
- .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_TOPIC_NAME),
- topic);
+ streamConfigMap.put(
+ StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_TOPIC_NAME), topic);
streamConfigMap.remove(
StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES));
exception = false;
@@ -105,11 +104,11 @@ public class StreamConfigTest {
Assert.assertTrue(exception);
// Missing consumer factory
- streamConfigMap
- .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
- consumerType);
- streamConfigMap.remove(StreamConfigProperties
- .constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS));
+ streamConfigMap.put(
+ StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
+ consumerType);
+ streamConfigMap.remove(StreamConfigProperties.constructStreamProperty(streamType,
+ StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS));
exception = false;
try {
streamConfig = new StreamConfig(streamConfigMap);
@@ -120,9 +119,8 @@ public class StreamConfigTest {
Assert.assertEquals(streamConfig.getConsumerFactoryClassName(), StreamConfig.getDefaultConsumerFactoryClassName());
// Missing decoder class
- streamConfigMap.put(StreamConfigProperties
- .constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS),
- consumerFactoryClassName);
+ streamConfigMap.put(StreamConfigProperties.constructStreamProperty(streamType,
+ StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS), consumerFactoryClass);
streamConfigMap.remove(
StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_DECODER_CLASS));
exception = false;
@@ -133,14 +131,14 @@ public class StreamConfigTest {
}
Assert.assertTrue(exception);
- streamConfigMap
- .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_DECODER_CLASS),
- decoderClass);
+ streamConfigMap.put(
+ StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_DECODER_CLASS),
+ decoderClass);
streamConfig = new StreamConfig(streamConfigMap);
Assert.assertEquals(streamConfig.getType(), streamType);
Assert.assertEquals(streamConfig.getTopicName(), topic);
Assert.assertEquals(streamConfig.getConsumerTypes().get(0), StreamConfig.ConsumerType.LOWLEVEL);
- Assert.assertEquals(streamConfig.getConsumerFactoryClassName(), consumerFactoryClassName);
+ Assert.assertEquals(streamConfig.getConsumerFactoryClassName(), consumerFactoryClass);
Assert.assertEquals(streamConfig.getDecoderClass(), decoderClass);
}
@@ -149,44 +147,42 @@ public class StreamConfigTest {
*/
@Test
public void testStreamConfigDefaults() {
- String streamType = "kafka";
- String topic = "aTopic";
+ String streamType = "fakeStream";
+ String topic = "fakeTopic";
String consumerType = "simple";
- String consumerFactoryClassName = KafkaConsumerFactory.class.getName();
- String decoderClass = KafkaAvroMessageDecoder.class.getName();
+ String consumerFactoryClass = FakeStreamConsumerFactory.class.getName();
+ String decoderClass = FakeStreamMessageDecoder.class.getName();
Map<String, String> streamConfigMap = new HashMap<>();
streamConfigMap.put(StreamConfigProperties.STREAM_TYPE, streamType);
- streamConfigMap
- .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_TOPIC_NAME),
- topic);
- streamConfigMap
- .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
- consumerType);
- streamConfigMap.put(StreamConfigProperties
- .constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS),
- consumerFactoryClassName);
- streamConfigMap
- .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_DECODER_CLASS),
- decoderClass);
+ streamConfigMap.put(
+ StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_TOPIC_NAME), topic);
+ streamConfigMap.put(
+ StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
+ consumerType);
+ streamConfigMap.put(StreamConfigProperties.constructStreamProperty(streamType,
+ StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS), consumerFactoryClass);
+ streamConfigMap.put(
+ StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_DECODER_CLASS),
+ decoderClass);
// Mandatory values + defaults
StreamConfig streamConfig = new StreamConfig(streamConfigMap);
Assert.assertEquals(streamConfig.getType(), streamType);
Assert.assertEquals(streamConfig.getTopicName(), topic);
Assert.assertEquals(streamConfig.getConsumerTypes().get(0), StreamConfig.ConsumerType.LOWLEVEL);
- Assert.assertEquals(streamConfig.getConsumerFactoryClassName(), consumerFactoryClassName);
+ Assert.assertEquals(streamConfig.getConsumerFactoryClassName(), consumerFactoryClass);
Assert.assertEquals(streamConfig.getDecoderClass(), decoderClass);
Assert.assertEquals(streamConfig.getDecoderProperties().size(), 0);
- Assert
- .assertEquals(streamConfig.getOffsetCriteria(), new OffsetCriteria.OffsetCriteriaBuilder().withOffsetLargest());
- Assert
- .assertEquals(streamConfig.getConnectionTimeoutMillis(), StreamConfig.DEFAULT_STREAM_CONNECTION_TIMEOUT_MILLIS);
+ Assert.assertEquals(streamConfig.getOffsetCriteria(),
+ new OffsetCriteria.OffsetCriteriaBuilder().withOffsetLargest());
+ Assert.assertEquals(streamConfig.getConnectionTimeoutMillis(),
+ StreamConfig.DEFAULT_STREAM_CONNECTION_TIMEOUT_MILLIS);
Assert.assertEquals(streamConfig.getFetchTimeoutMillis(), StreamConfig.DEFAULT_STREAM_FETCH_TIMEOUT_MILLIS);
Assert.assertEquals(streamConfig.getFlushThresholdRows(), StreamConfig.getDefaultFlushThresholdRows());
Assert.assertEquals(streamConfig.getFlushThresholdTimeMillis(), StreamConfig.getDefaultFlushThresholdTimeMillis());
- Assert
- .assertEquals(streamConfig.getFlushSegmentDesiredSizeBytes(), StreamConfig.getDefaultDesiredSegmentSizeBytes());
+ Assert.assertEquals(streamConfig.getFlushSegmentDesiredSizeBytes(),
+ StreamConfig.getDefaultDesiredSegmentSizeBytes());
consumerType = "lowLevel,highLevel";
String offsetCriteria = "smallest";
@@ -197,17 +193,16 @@ public class StreamConfigTest {
String flushThresholdTime = "2h";
String flushThresholdRows = "500";
String flushSegmentSize = "20M";
- streamConfigMap
- .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
- consumerType);
+ streamConfigMap.put(
+ StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
+ consumerType);
streamConfigMap.put(
StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.DECODER_PROPS_PREFIX) + "."
+ decoderProp1Key, decoderProp1Value);
- streamConfigMap.put(StreamConfigProperties
- .constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_OFFSET_CRITERIA), offsetCriteria);
- streamConfigMap.put(StreamConfigProperties
- .constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONNECTION_TIMEOUT_MILLIS),
- connectionTimeout);
+ streamConfigMap.put(StreamConfigProperties.constructStreamProperty(streamType,
+ StreamConfigProperties.STREAM_CONSUMER_OFFSET_CRITERIA), offsetCriteria);
+ streamConfigMap.put(StreamConfigProperties.constructStreamProperty(streamType,
+ StreamConfigProperties.STREAM_CONNECTION_TIMEOUT_MILLIS), connectionTimeout);
streamConfigMap.put(
StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_FETCH_TIMEOUT_MILLIS),
fetchTimeout);
@@ -220,11 +215,11 @@ public class StreamConfigTest {
Assert.assertEquals(streamConfig.getTopicName(), topic);
Assert.assertEquals(streamConfig.getConsumerTypes().get(0), StreamConfig.ConsumerType.LOWLEVEL);
Assert.assertEquals(streamConfig.getConsumerTypes().get(1), StreamConfig.ConsumerType.HIGHLEVEL);
- Assert.assertEquals(streamConfig.getConsumerFactoryClassName(), consumerFactoryClassName);
+ Assert.assertEquals(streamConfig.getConsumerFactoryClassName(), consumerFactoryClass);
Assert.assertEquals(streamConfig.getDecoderClass(), decoderClass);
Assert.assertEquals(streamConfig.getDecoderProperties().size(), 1);
Assert.assertEquals(streamConfig.getDecoderProperties().get(decoderProp1Key), decoderProp1Value);
- Assert.assertEquals(streamConfig.getOffsetCriteria().isSmallest(), true);
+ Assert.assertTrue(streamConfig.getOffsetCriteria().isSmallest());
Assert.assertEquals(streamConfig.getConnectionTimeoutMillis(), Long.parseLong(connectionTimeout));
Assert.assertEquals(streamConfig.getFetchTimeoutMillis(), Integer.parseInt(fetchTimeout));
Assert.assertEquals(streamConfig.getFlushThresholdRows(), Integer.parseInt(flushThresholdRows));
@@ -250,33 +245,31 @@ public class StreamConfigTest {
public void testStreamConfigValidations() {
boolean exception;
StreamConfig streamConfig;
- String streamType = "kafka";
- String topic = "aTopic";
+ String streamType = "fakeStream";
+ String topic = "fakeTopic";
String consumerType = "simple";
- String consumerFactoryClassName = KafkaConsumerFactory.class.getName();
- String decoderClass = KafkaAvroMessageDecoder.class.getName();
+ String consumerFactoryClass = FakeStreamConsumerFactory.class.getName();
+ String decoderClass = FakeStreamMessageDecoder.class.getName();
// All mandatory properties set
Map<String, String> streamConfigMap = new HashMap<>();
streamConfigMap.put(StreamConfigProperties.STREAM_TYPE, streamType);
- streamConfigMap
- .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_TOPIC_NAME),
- topic);
- streamConfigMap
- .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
- consumerType);
- streamConfigMap.put(StreamConfigProperties
- .constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS),
- consumerFactoryClassName);
- streamConfigMap
- .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_DECODER_CLASS),
- decoderClass);
+ streamConfigMap.put(
+ StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_TOPIC_NAME), topic);
+ streamConfigMap.put(
+ StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
+ consumerType);
+ streamConfigMap.put(StreamConfigProperties.constructStreamProperty(streamType,
+ StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS), consumerFactoryClass);
+ streamConfigMap.put(
+ StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_DECODER_CLASS),
+ decoderClass);
streamConfig = new StreamConfig(streamConfigMap);
// Invalid consumer type
- streamConfigMap
- .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
- "invalidConsumerType");
+ streamConfigMap.put(
+ StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
+ "invalidConsumerType");
exception = false;
try {
streamConfig = new StreamConfig(streamConfigMap);
@@ -286,9 +279,9 @@ public class StreamConfigTest {
Assert.assertTrue(exception);
// Invalid fetch timeout
- streamConfigMap
- .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
- consumerType);
+ streamConfigMap.put(
+ StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
+ consumerType);
streamConfigMap.put(
StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_FETCH_TIMEOUT_MILLIS),
"timeout");
@@ -298,15 +291,15 @@ public class StreamConfigTest {
// Invalid connection timeout
streamConfigMap.remove(
StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_FETCH_TIMEOUT_MILLIS));
- streamConfigMap.put(StreamConfigProperties
- .constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONNECTION_TIMEOUT_MILLIS), "timeout");
+ streamConfigMap.put(StreamConfigProperties.constructStreamProperty(streamType,
+ StreamConfigProperties.STREAM_CONNECTION_TIMEOUT_MILLIS), "timeout");
streamConfig = new StreamConfig(streamConfigMap);
- Assert
- .assertEquals(streamConfig.getConnectionTimeoutMillis(), StreamConfig.DEFAULT_STREAM_CONNECTION_TIMEOUT_MILLIS);
+ Assert.assertEquals(streamConfig.getConnectionTimeoutMillis(),
+ StreamConfig.DEFAULT_STREAM_CONNECTION_TIMEOUT_MILLIS);
// Invalid flush threshold rows
- streamConfigMap.remove(StreamConfigProperties
- .constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONNECTION_TIMEOUT_MILLIS));
+ streamConfigMap.remove(StreamConfigProperties.constructStreamProperty(streamType,
+ StreamConfigProperties.STREAM_CONNECTION_TIMEOUT_MILLIS));
streamConfigMap.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS, "rows");
streamConfig = new StreamConfig(streamConfigMap);
Assert.assertEquals(streamConfig.getFlushThresholdRows(), StreamConfig.getDefaultFlushThresholdRows());
@@ -321,8 +314,8 @@ public class StreamConfigTest {
streamConfigMap.remove(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_TIME);
streamConfigMap.put(StreamConfigProperties.SEGMENT_FLUSH_DESIRED_SIZE, "size");
streamConfig = new StreamConfig(streamConfigMap);
- Assert
- .assertEquals(streamConfig.getFlushSegmentDesiredSizeBytes(), StreamConfig.getDefaultDesiredSegmentSizeBytes());
+ Assert.assertEquals(streamConfig.getFlushSegmentDesiredSizeBytes(),
+ StreamConfig.getDefaultDesiredSegmentSizeBytes());
}
/**
@@ -331,11 +324,11 @@ public class StreamConfigTest {
@Test
public void testFlushThresholdStreamConfigs() {
StreamConfig streamConfig;
- String streamType = "kafka";
- String topic = "aTopic";
+ String streamType = "fakeStream";
+ String topic = "fakeTopic";
String consumerType = "lowlevel";
- String consumerFactoryClassName = KafkaConsumerFactory.class.getName();
- String decoderClass = KafkaAvroMessageDecoder.class.getName();
+ String consumerFactoryClass = FakeStreamConsumerFactory.class.getName();
+ String decoderClass = FakeStreamMessageDecoder.class.getName();
String flushThresholdRows = "200";
String flushThresholdRowsLLC = "400";
String flushThresholdTime = "2h";
@@ -343,18 +336,16 @@ public class StreamConfigTest {
Map<String, String> streamConfigMap = new HashMap<>();
streamConfigMap.put(StreamConfigProperties.STREAM_TYPE, streamType);
- streamConfigMap
- .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_TOPIC_NAME),
- topic);
- streamConfigMap
- .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
- consumerType);
- streamConfigMap.put(StreamConfigProperties
- .constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS),
- consumerFactoryClassName);
- streamConfigMap
- .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_DECODER_CLASS),
- decoderClass);
+ streamConfigMap.put(
+ StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_TOPIC_NAME), topic);
+ streamConfigMap.put(
+ StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
+ consumerType);
+ streamConfigMap.put(StreamConfigProperties.constructStreamProperty(streamType,
+ StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS), consumerFactoryClass);
+ streamConfigMap.put(
+ StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_DECODER_CLASS),
+ decoderClass);
// use defaults if nothing provided
streamConfig = new StreamConfig(streamConfigMap);
@@ -406,62 +397,60 @@ public class StreamConfigTest {
streamConfigMap.remove(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS);
streamConfigMap.remove(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_TIME);
partitionLevelStreamConfig = new PartitionLevelStreamConfig(streamConfigMap);
- Assert
- .assertEquals(partitionLevelStreamConfig.getFlushThresholdRows(), StreamConfig.getDefaultFlushThresholdRows());
+ Assert.assertEquals(partitionLevelStreamConfig.getFlushThresholdRows(),
+ StreamConfig.getDefaultFlushThresholdRows());
Assert.assertEquals(partitionLevelStreamConfig.getFlushThresholdTimeMillis(),
StreamConfig.getDefaultFlushThresholdTimeMillis());
}
@Test
public void testConsumerTypes() {
- String streamType = "kafka";
- String topic = "aTopic";
- String consumerFactoryClassName = KafkaConsumerFactory.class.getName();
- String decoderClass = KafkaAvroMessageDecoder.class.getName();
+ String streamType = "fakeStream";
+ String topic = "fakeTopic";
+ String consumerFactoryClass = FakeStreamConsumerFactory.class.getName();
+ String decoderClass = FakeStreamMessageDecoder.class.getName();
Map<String, String> streamConfigMap = new HashMap<>();
streamConfigMap.put(StreamConfigProperties.STREAM_TYPE, streamType);
- streamConfigMap
- .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_TOPIC_NAME),
- topic);
- streamConfigMap.put(StreamConfigProperties
- .constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS),
- consumerFactoryClassName);
- streamConfigMap
- .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_DECODER_CLASS),
- decoderClass);
+ streamConfigMap.put(
+ StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_TOPIC_NAME), topic);
+ streamConfigMap.put(StreamConfigProperties.constructStreamProperty(streamType,
+ StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS), consumerFactoryClass);
+ streamConfigMap.put(
+ StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_DECODER_CLASS),
+ decoderClass);
String consumerType = "simple";
- streamConfigMap
- .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
- consumerType);
+ streamConfigMap.put(
+ StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
+ consumerType);
StreamConfig streamConfig = new StreamConfig(streamConfigMap);
Assert.assertEquals(streamConfig.getConsumerTypes().get(0), StreamConfig.ConsumerType.LOWLEVEL);
Assert.assertTrue(streamConfig.hasLowLevelConsumerType());
Assert.assertFalse(streamConfig.hasHighLevelConsumerType());
consumerType = "lowlevel";
- streamConfigMap
- .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
- consumerType);
+ streamConfigMap.put(
+ StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
+ consumerType);
streamConfig = new StreamConfig(streamConfigMap);
Assert.assertEquals(streamConfig.getConsumerTypes().get(0), StreamConfig.ConsumerType.LOWLEVEL);
Assert.assertTrue(streamConfig.hasLowLevelConsumerType());
Assert.assertFalse(streamConfig.hasHighLevelConsumerType());
consumerType = "highLevel";
- streamConfigMap
- .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
- consumerType);
+ streamConfigMap.put(
+ StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
+ consumerType);
streamConfig = new StreamConfig(streamConfigMap);
Assert.assertEquals(streamConfig.getConsumerTypes().get(0), StreamConfig.ConsumerType.HIGHLEVEL);
Assert.assertFalse(streamConfig.hasLowLevelConsumerType());
Assert.assertTrue(streamConfig.hasHighLevelConsumerType());
consumerType = "highLevel,simple";
- streamConfigMap
- .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
- consumerType);
+ streamConfigMap.put(
+ StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
+ consumerType);
streamConfig = new StreamConfig(streamConfigMap);
Assert.assertEquals(streamConfig.getConsumerTypes().get(0), StreamConfig.ConsumerType.HIGHLEVEL);
Assert.assertEquals(streamConfig.getConsumerTypes().get(1), StreamConfig.ConsumerType.LOWLEVEL);
@@ -469,9 +458,9 @@ public class StreamConfigTest {
Assert.assertTrue(streamConfig.hasHighLevelConsumerType());
consumerType = "highLevel,lowlevel";
- streamConfigMap
- .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
- consumerType);
+ streamConfigMap.put(
+ StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
+ consumerType);
streamConfig = new StreamConfig(streamConfigMap);
Assert.assertEquals(streamConfig.getConsumerTypes().get(0), StreamConfig.ConsumerType.HIGHLEVEL);
Assert.assertEquals(streamConfig.getConsumerTypes().get(1), StreamConfig.ConsumerType.LOWLEVEL);
diff --git a/pinot-core/src/test/resources/data/fakestream/fake_stream_avro_data.tar.gz b/pinot-core/src/test/resources/data/fakestream/fake_stream_avro_data.tar.gz
new file mode 100644
index 0000000..994ad9f
Binary files /dev/null and b/pinot-core/src/test/resources/data/fakestream/fake_stream_avro_data.tar.gz differ
diff --git a/pinot-core/src/test/resources/data/fakestream/fake_stream_avro_schema.avsc b/pinot-core/src/test/resources/data/fakestream/fake_stream_avro_schema.avsc
new file mode 100644
index 0000000..0213a04
--- /dev/null
+++ b/pinot-core/src/test/resources/data/fakestream/fake_stream_avro_schema.avsc
@@ -0,0 +1,524 @@
+{
+ "type": "record",
+ "name": "Flight",
+ "namespace": "pinot",
+ "fields": [
+ {
+ "name": "DaysSinceEpoch",
+ "type": [
+ "int"
+ ]
+ },
+ {
+ "name": "Year",
+ "type": [
+ "int"
+ ]
+ },
+ {
+ "name": "Quarter",
+ "type": [
+ "int"
+ ]
+ },
+ {
+ "name": "Month",
+ "type": [
+ "int"
+ ]
+ },
+ {
+ "name": "DayofMonth",
+ "type": [
+ "int"
+ ]
+ },
+ {
+ "name": "DayOfWeek",
+ "type": [
+ "int"
+ ]
+ },
+ {
+ "name": "FlightDate",
+ "type": [
+ "string"
+ ]
+ },
+ {
+ "name": "UniqueCarrier",
+ "type": [
+ "string"
+ ]
+ },
+ {
+ "name": "AirlineID",
+ "type": [
+ "long"
+ ]
+ },
+ {
+ "name": "Carrier",
+ "type": [
+ "string"
+ ]
+ },
+ {
+ "name": "TailNum",
+ "type": [
+ "string",
+ "null"
+ ]
+ },
+ {
+ "name": "FlightNum",
+ "type": [
+ "int"
+ ]
+ },
+ {
+ "name": "OriginAirportID",
+ "type": [
+ "int"
+ ]
+ },
+ {
+ "name": "OriginAirportSeqID",
+ "type": [
+ "int"
+ ]
+ },
+ {
+ "name": "OriginCityMarketID",
+ "type": [
+ "int"
+ ]
+ },
+ {
+ "name": "Origin",
+ "type": [
+ "string"
+ ]
+ },
+ {
+ "name": "OriginCityName",
+ "type": [
+ "string"
+ ]
+ },
+ {
+ "name": "OriginState",
+ "type": [
+ "string"
+ ]
+ },
+ {
+ "name": "OriginStateFips",
+ "type": [
+ "int",
+ "null"
+ ]
+ },
+ {
+ "name": "OriginStateName",
+ "type": [
+ "string"
+ ]
+ },
+ {
+ "name": "OriginWac",
+ "type": [
+ "int"
+ ]
+ },
+ {
+ "name": "DestAirportID",
+ "type": [
+ "int"
+ ]
+ },
+ {
+ "name": "DestAirportSeqID",
+ "type": [
+ "int"
+ ]
+ },
+ {
+ "name": "DestCityMarketID",
+ "type": [
+ "int"
+ ]
+ },
+ {
+ "name": "Dest",
+ "type": [
+ "string"
+ ]
+ },
+ {
+ "name": "DestCityName",
+ "type": [
+ "string"
+ ]
+ },
+ {
+ "name": "DestState",
+ "type": [
+ "string"
+ ]
+ },
+ {
+ "name": "DestStateFips",
+ "type": [
+ "int",
+ "null"
+ ]
+ },
+ {
+ "name": "DestStateName",
+ "type": [
+ "string"
+ ]
+ },
+ {
+ "name": "DestWac",
+ "type": [
+ "int"
+ ]
+ },
+ {
+ "name": "CRSDepTime",
+ "type": [
+ "int"
+ ]
+ },
+ {
+ "name": "DepTime",
+ "type": [
+ "int",
+ "null"
+ ]
+ },
+ {
+ "name": "DepDelay",
+ "type": [
+ "double",
+ "null"
+ ]
+ },
+ {
+ "name": "DepDelayMinutes",
+ "type": [
+ "float",
+ "null"
+ ]
+ },
+ {
+ "name": "DepDel15",
+ "type": [
+ "int",
+ "null"
+ ]
+ },
+ {
+ "name": "DepartureDelayGroups",
+ "type": [
+ "int",
+ "null"
+ ]
+ },
+ {
+ "name": "DepTimeBlk",
+ "type": [
+ "string"
+ ]
+ },
+ {
+ "name": "TaxiOut",
+ "type": [
+ "int",
+ "null"
+ ]
+ },
+ {
+ "name": "WheelsOff",
+ "type": [
+ "int",
+ "null"
+ ]
+ },
+ {
+ "name": "WheelsOn",
+ "type": [
+ "int",
+ "null"
+ ]
+ },
+ {
+ "name": "TaxiIn",
+ "type": [
+ "int",
+ "null"
+ ]
+ },
+ {
+ "name": "CRSArrTime",
+ "type": [
+ "int"
+ ]
+ },
+ {
+ "name": "ArrTime",
+ "type": [
+ "int",
+ "null"
+ ]
+ },
+ {
+ "name": "ArrDelay",
+ "type": [
+ "float",
+ "null"
+ ]
+ },
+ {
+ "name": "ArrDelayMinutes",
+ "type": [
+ "double",
+ "null"
+ ]
+ },
+ {
+ "name": "ArrDel15",
+ "type": [
+ "int",
+ "null"
+ ]
+ },
+ {
+ "name": "ArrivalDelayGroups",
+ "type": [
+ "int",
+ "null"
+ ]
+ },
+ {
+ "name": "ArrTimeBlk",
+ "type": [
+ "string"
+ ]
+ },
+ {
+ "name": "Cancelled",
+ "type": [
+ "int"
+ ]
+ },
+ {
+ "name": "CancellationCode",
+ "type": [
+ "string",
+ "null"
+ ]
+ },
+ {
+ "name": "Diverted",
+ "type": [
+ "int"
+ ]
+ },
+ {
+ "name": "CRSElapsedTime",
+ "type": [
+ "int",
+ "null"
+ ]
+ },
+ {
+ "name": "ActualElapsedTime",
+ "type": [
+ "int",
+ "null"
+ ]
+ },
+ {
+ "name": "AirTime",
+ "type": [
+ "int",
+ "null"
+ ]
+ },
+ {
+ "name": "Flights",
+ "type": [
+ "int"
+ ]
+ },
+ {
+ "name": "Distance",
+ "type": [
+ "int"
+ ]
+ },
+ {
+ "name": "DistanceGroup",
+ "type": [
+ "int"
+ ]
+ },
+ {
+ "name": "CarrierDelay",
+ "type": [
+ "int",
+ "null"
+ ]
+ },
+ {
+ "name": "WeatherDelay",
+ "type": [
+ "int",
+ "null"
+ ]
+ },
+ {
+ "name": "NASDelay",
+ "type": [
+ "int",
+ "null"
+ ]
+ },
+ {
+ "name": "SecurityDelay",
+ "type": [
+ "int",
+ "null"
+ ]
+ },
+ {
+ "name": "LateAircraftDelay",
+ "type": [
+ "int",
+ "null"
+ ]
+ },
+ {
+ "name": "FirstDepTime",
+ "type": [
+ "int",
+ "null"
+ ]
+ },
+ {
+ "name": "TotalAddGTime",
+ "type": [
+ "int",
+ "null"
+ ]
+ },
+ {
+ "name": "LongestAddGTime",
+ "type": [
+ "int",
+ "null"
+ ]
+ },
+ {
+ "name": "DivAirportLandings",
+ "type": [
+ "int",
+ "null"
+ ]
+ },
+ {
+ "name": "DivReachedDest",
+ "type": [
+ "int",
+ "null"
+ ]
+ },
+ {
+ "name": "DivActualElapsedTime",
+ "type": [
+ "int",
+ "null"
+ ]
+ },
+ {
+ "name": "DivArrDelay",
+ "type": [
+ "int",
+ "null"
+ ]
+ },
+ {
+ "name": "DivDistance",
+ "type": [
+ "int",
+ "null"
+ ]
+ },
+ {
+ "name": "DivAirports",
+ "type": {
+ "type": "array",
+ "items": "string"
+ }
+ },
+ {
+ "name": "DivAirportIDs",
+ "type": {
+ "type": "array",
+ "items": "int"
+ }
+ },
+ {
+ "name": "DivAirportSeqIDs",
+ "type": {
+ "type": "array",
+ "items": "int"
+ }
+ },
+ {
+ "name": "DivWheelsOns",
+ "type": {
+ "type": "array",
+ "items": "int"
+ }
+ },
+ {
+ "name": "DivTotalGTimes",
+ "type": {
+ "type": "array",
+ "items": "long"
+ }
+ },
+ {
+ "name": "DivLongestGTimes",
+ "type": {
+ "type": "array",
+ "items": "float"
+ }
+ },
+ {
+ "name": "DivWheelsOffs",
+ "type": {
+ "type": "array",
+ "items": "int"
+ }
+ },
+ {
+ "name": "DivTailNums",
+ "type": {
+ "type": "array",
+ "items": "string"
+ }
+ },
+ {
+ "name": "RandomAirports",
+ "type": {
+ "type": "array",
+ "items": "string"
+ }
+ }
+ ]
+}
diff --git a/pinot-core/src/test/resources/data/fakestream/fake_stream_pinot_schema.json b/pinot-core/src/test/resources/data/fakestream/fake_stream_pinot_schema.json
new file mode 100644
index 0000000..f17cb53
--- /dev/null
+++ b/pinot-core/src/test/resources/data/fakestream/fake_stream_pinot_schema.json
@@ -0,0 +1,335 @@
+{
+ "schemaName": "mytable",
+ "dimensionFieldSpecs": [
+ {
+ "name": "AirlineID",
+ "dataType": "LONG"
+ },
+ {
+ "name": "ArrTime",
+ "dataType": "INT"
+ },
+ {
+ "name": "ArrTimeBlk",
+ "dataType": "STRING"
+ },
+ {
+ "name": "CRSArrTime",
+ "dataType": "INT"
+ },
+ {
+ "name": "CRSDepTime",
+ "dataType": "INT"
+ },
+ {
+ "name": "CRSElapsedTime",
+ "dataType": "INT"
+ },
+ {
+ "name": "CancellationCode",
+ "dataType": "STRING"
+ },
+ {
+ "name": "Carrier",
+ "dataType": "STRING"
+ },
+ {
+ "name": "DayOfWeek",
+ "dataType": "INT"
+ },
+ {
+ "name": "DayofMonth",
+ "dataType": "INT"
+ },
+ {
+ "name": "DepTime",
+ "dataType": "INT"
+ },
+ {
+ "name": "DepTimeBlk",
+ "dataType": "STRING"
+ },
+ {
+ "name": "Dest",
+ "dataType": "STRING"
+ },
+ {
+ "name": "DestAirportID",
+ "dataType": "INT"
+ },
+ {
+ "name": "DestAirportSeqID",
+ "dataType": "INT"
+ },
+ {
+ "name": "DestCityMarketID",
+ "dataType": "INT"
+ },
+ {
+ "name": "DestCityName",
+ "dataType": "STRING"
+ },
+ {
+ "name": "DestState",
+ "dataType": "STRING"
+ },
+ {
+ "name": "DestStateFips",
+ "dataType": "INT"
+ },
+ {
+ "name": "DestStateName",
+ "dataType": "STRING"
+ },
+ {
+ "name": "DestWac",
+ "dataType": "INT"
+ },
+ {
+ "name": "Distance",
+ "dataType": "INT"
+ },
+ {
+ "name": "DistanceGroup",
+ "dataType": "INT"
+ },
+ {
+ "name": "DivActualElapsedTime",
+ "dataType": "INT"
+ },
+ {
+ "name": "DivAirportIDs",
+ "dataType": "INT",
+ "singleValueField": false
+ },
+ {
+ "name": "DivAirportLandings",
+ "dataType": "INT"
+ },
+ {
+ "name": "DivAirportSeqIDs",
+ "dataType": "INT",
+ "singleValueField": false
+ },
+ {
+ "name": "DivAirports",
+ "dataType": "STRING",
+ "singleValueField": false
+ },
+ {
+ "name": "DivArrDelay",
+ "dataType": "INT"
+ },
+ {
+ "name": "DivDistance",
+ "dataType": "INT"
+ },
+ {
+ "name": "DivLongestGTimes",
+ "dataType": "FLOAT",
+ "singleValueField": false
+ },
+ {
+ "name": "DivReachedDest",
+ "dataType": "INT"
+ },
+ {
+ "name": "DivTailNums",
+ "dataType": "STRING",
+ "singleValueField": false
+ },
+ {
+ "name": "DivTotalGTimes",
+ "dataType": "LONG",
+ "singleValueField": false
+ },
+ {
+ "name": "DivWheelsOffs",
+ "dataType": "INT",
+ "singleValueField": false
+ },
+ {
+ "name": "DivWheelsOns",
+ "dataType": "INT",
+ "singleValueField": false
+ },
+ {
+ "name": "Diverted",
+ "dataType": "INT"
+ },
+ {
+ "name": "FirstDepTime",
+ "dataType": "INT"
+ },
+ {
+ "name": "FlightDate",
+ "dataType": "STRING"
+ },
+ {
+ "name": "FlightNum",
+ "dataType": "INT"
+ },
+ {
+ "name": "Flights",
+ "dataType": "INT"
+ },
+ {
+ "name": "LongestAddGTime",
+ "dataType": "INT"
+ },
+ {
+ "name": "Month",
+ "dataType": "INT"
+ },
+ {
+ "name": "Origin",
+ "dataType": "STRING"
+ },
+ {
+ "name": "OriginAirportID",
+ "dataType": "INT"
+ },
+ {
+ "name": "OriginAirportSeqID",
+ "dataType": "INT"
+ },
+ {
+ "name": "OriginCityMarketID",
+ "dataType": "INT"
+ },
+ {
+ "name": "OriginCityName",
+ "dataType": "STRING"
+ },
+ {
+ "name": "OriginState",
+ "dataType": "STRING"
+ },
+ {
+ "name": "OriginStateFips",
+ "dataType": "INT"
+ },
+ {
+ "name": "OriginStateName",
+ "dataType": "STRING"
+ },
+ {
+ "name": "OriginWac",
+ "dataType": "INT"
+ },
+ {
+ "name": "Quarter",
+ "dataType": "INT"
+ },
+ {
+ "name": "RandomAirports",
+ "dataType": "STRING",
+ "singleValueField": false
+ },
+ {
+ "name": "TailNum",
+ "dataType": "STRING"
+ },
+ {
+ "name": "TaxiIn",
+ "dataType": "INT"
+ },
+ {
+ "name": "TaxiOut",
+ "dataType": "INT"
+ },
+ {
+ "name": "Year",
+ "dataType": "INT"
+ },
+ {
+ "name": "WheelsOn",
+ "dataType": "INT"
+ },
+ {
+ "name": "WheelsOff",
+ "dataType": "INT"
+ },
+ {
+ "name": "UniqueCarrier",
+ "dataType": "STRING"
+ },
+ {
+ "name": "TotalAddGTime",
+ "dataType": "INT"
+ }
+ ],
+ "metricFieldSpecs": [
+ {
+ "name": "ActualElapsedTime",
+ "dataType": "INT"
+ },
+ {
+ "name": "AirTime",
+ "dataType": "INT"
+ },
+ {
+ "name": "ArrDel15",
+ "dataType": "INT"
+ },
+ {
+ "name": "ArrDelay",
+ "dataType": "FLOAT"
+ },
+ {
+ "name": "ArrivalDelayGroups",
+ "dataType": "INT"
+ },
+ {
+ "name": "ArrDelayMinutes",
+ "dataType": "DOUBLE"
+ },
+ {
+ "name": "Cancelled",
+ "dataType": "INT"
+ },
+ {
+ "name": "CarrierDelay",
+ "dataType": "INT"
+ },
+ {
+ "name": "DepDel15",
+ "dataType": "INT"
+ },
+ {
+ "name": "DepDelay",
+ "dataType": "DOUBLE"
+ },
+ {
+ "name": "DepDelayMinutes",
+ "dataType": "FLOAT"
+ },
+ {
+ "name": "DepartureDelayGroups",
+ "dataType": "INT"
+ },
+ {
+ "name": "LateAircraftDelay",
+ "dataType": "INT"
+ },
+ {
+ "name": "NASDelay",
+ "dataType": "INT"
+ },
+ {
+ "name": "SecurityDelay",
+ "dataType": "INT"
+ },
+ {
+ "name": "WeatherDelay",
+ "dataType": "INT"
+ }
+ ],
+ "timeFieldSpec": {
+ "incomingGranularitySpec": {
+ "name": "DaysSinceEpoch",
+ "dataType": "INT",
+ "timeType": "DAYS"
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org