You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2019/07/03 10:13:05 UTC

[incubator-pinot] branch kafka_2.0 created (now d9e0316)

This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a change to branch kafka_2.0
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


      at d9e0316  Adding support for Kafka 2.0

This branch includes the following new commits:

     new 3f682d0  WIP: adding kafka 2 stream provider
     new d9e0316  Adding support for Kafka 2.0

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/02: WIP: adding kafka 2 stream provider

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch kafka_2.0
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 3f682d0c454263a8ab5361d751f9f5c9b30e715a
Author: Ananth Packkildurai <ap...@slack-corp.com>
AuthorDate: Wed Jun 12 18:31:22 2019 -0700

    WIP: adding kafka 2 stream provider
---
 .../pinot-connector-kafka-2.0/README.md            |  24 ++++
 pinot-connectors/pinot-connector-kafka-2.0/pom.xml |  67 ++++++++++
 .../impl/kafka2/KafkaConnectionHandler.java        |  61 +++++++++
 .../realtime/impl/kafka2/KafkaConsumerFactory.java |  49 +++++++
 .../realtime/impl/kafka2/KafkaMessageBatch.java    |  65 ++++++++++
 .../impl/kafka2/KafkaPartitionConsumer.java        |  51 ++++++++
 .../kafka2/KafkaPartitionLevelStreamConfig.java    | 144 +++++++++++++++++++++
 .../impl/kafka2/KafkaStreamConfigProperties.java   |  65 ++++++++++
 .../impl/kafka2/KafkaStreamMetadataProvider.java   |  81 ++++++++++++
 .../realtime/impl/kafka2/MessageAndOffset.java     |  49 +++++++
 pinot-connectors/pom.xml                           |   2 +
 11 files changed, 658 insertions(+)

diff --git a/pinot-connectors/pinot-connector-kafka-2.0/README.md b/pinot-connectors/pinot-connector-kafka-2.0/README.md
new file mode 100644
index 0000000..cc1950c
--- /dev/null
+++ b/pinot-connectors/pinot-connector-kafka-2.0/README.md
@@ -0,0 +1,24 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+# Pinot connector for kafka 2.0.x
+
+This is an implementation of the kafka stream for kafka versions 2.0.x The version used in this implementation is kafka 2.0.0. This module compiles with version 2.0.0 as well, however we have not tested if it runs with the older versions.
+A stream plugin for another version of kafka, or another stream, can be added in a similar fashion. Refer to documentation on (Pluggable Streams)[https://pinot.readthedocs.io/en/latest/pluggable_streams.html] for the specfic interfaces to implement.
diff --git a/pinot-connectors/pinot-connector-kafka-2.0/pom.xml b/pinot-connectors/pinot-connector-kafka-2.0/pom.xml
new file mode 100644
index 0000000..f351219
--- /dev/null
+++ b/pinot-connectors/pinot-connector-kafka-2.0/pom.xml
@@ -0,0 +1,67 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>pinot-connectors</artifactId>
+        <groupId>org.apache.pinot</groupId>
+        <version>0.2.0-SNAPSHOT</version>
+        <relativePath>..</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>pinot-connector-kafka-2.0</artifactId>
+
+    <properties>
+        <pinot.root>${basedir}/../..</pinot.root>
+        <kafka.version>2.0.0</kafka.version>
+    </properties>
+
+    <dependencies>
+
+        <!-- Kafka  -->
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>${kafka.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>net.sf.jopt-simple</groupId>
+                    <artifactId>jopt-simple</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.scala-lang</groupId>
+                    <artifactId>scala-library</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.zookeeper</groupId>
+                    <artifactId>zookeeper</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file
diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaConnectionHandler.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaConnectionHandler.java
new file mode 100644
index 0000000..802062f
--- /dev/null
+++ b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaConnectionHandler.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.kafka2;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.BytesDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.pinot.core.realtime.stream.StreamConfig;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Properties;
+
+public abstract class KafkaConnectionHandler {
+
+    protected final KafkaPartitionLevelStreamConfig _config;
+    protected final int _partition;
+    protected final String _topic;
+    protected final Consumer<String, byte[]> _consumer;
+    protected final TopicPartition _topicPartition;
+
+    public KafkaConnectionHandler(StreamConfig streamConfig, int partition) {
+        _config = new KafkaPartitionLevelStreamConfig(streamConfig);
+        _partition = partition;
+        _topic = _config.getKafkaTopicName();
+        Properties consumerProp = new Properties();
+        consumerProp.putAll(streamConfig.getStreamConfigsMap());
+        consumerProp.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, _config.getBootstrapHosts());
+        consumerProp.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+        consumerProp.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class.getName());
+        _consumer = new KafkaConsumer<>(consumerProp);
+        _topicPartition = new TopicPartition(_topic, _partition);
+        _consumer.assign(Collections.singletonList(_topicPartition));
+
+    }
+
+    public void close() throws IOException {
+        _consumer.close();
+    }
+
+
+}
diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaConsumerFactory.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaConsumerFactory.java
new file mode 100644
index 0000000..cc3d8a6
--- /dev/null
+++ b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaConsumerFactory.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.kafka2;
+
+import org.apache.pinot.common.data.Schema;
+import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.core.realtime.stream.PartitionLevelConsumer;
+import org.apache.pinot.core.realtime.stream.StreamConsumerFactory;
+import org.apache.pinot.core.realtime.stream.StreamLevelConsumer;
+import org.apache.pinot.core.realtime.stream.StreamMetadataProvider;
+
+public class KafkaConsumerFactory extends StreamConsumerFactory {
+    @Override
+    public PartitionLevelConsumer createPartitionLevelConsumer(String clientId, int partition) {
+        return new KafkaPartitionConsumer(_streamConfig, partition);
+    }
+
+    @Override
+    public StreamLevelConsumer createStreamLevelConsumer(String clientId, String tableName, Schema schema, InstanceZKMetadata instanceZKMetadata, ServerMetrics serverMetrics) {
+        throw new UnsupportedOperationException("High level consumer not supported in kafka 2. Use Kafka partition level consumers");
+    }
+
+    @Override
+    public StreamMetadataProvider createPartitionMetadataProvider(String clientId, int partition) {
+        return null;
+    }
+
+    @Override
+    public StreamMetadataProvider createStreamMetadataProvider(String clientId) {
+        throw new UnsupportedOperationException("High level consumer not supported in kafka 2. Use Kafka partition level consumers");
+    }
+}
diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaMessageBatch.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaMessageBatch.java
new file mode 100644
index 0000000..22aa683
--- /dev/null
+++ b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaMessageBatch.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.kafka2;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.pinot.core.realtime.stream.MessageBatch;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class KafkaMessageBatch implements MessageBatch<byte[]> {
+
+    private List<MessageAndOffset> messageList = new ArrayList<>();
+
+    public KafkaMessageBatch(Iterable<ConsumerRecord<String, byte[]>> iterable) {
+        for (ConsumerRecord<String, byte[]> record : iterable) {
+            messageList.add(new MessageAndOffset(record.value(), record.offset()));
+        }
+    }
+
+    @Override
+    public int getMessageCount() {
+        return messageList.size();
+    }
+
+    @Override
+    public byte[] getMessageAtIndex(int index) {
+        return messageList.get(index).getMessage().array();
+    }
+
+    @Override
+    public int getMessageOffsetAtIndex(int index) {
+        return messageList.get(index).getMessage().arrayOffset();
+    }
+
+    @Override
+    public int getMessageLengthAtIndex(int index) {
+        return messageList.get(index).getMessage().array().length;
+    }
+
+    @Override
+    public long getNextStreamMessageOffsetAtIndex(int index) {
+        return messageList.get(index).getNextOffset();
+    }
+
+    public Iterable<MessageAndOffset> iterable() {
+        return messageList;
+    }
+}
diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionConsumer.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionConsumer.java
new file mode 100644
index 0000000..de3295d
--- /dev/null
+++ b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionConsumer.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.kafka2;
+
+import org.apache.kafka.clients.consumer.*;
+
+import org.apache.pinot.core.realtime.stream.MessageBatch;
+import org.apache.pinot.core.realtime.stream.PartitionLevelConsumer;
+import org.apache.pinot.core.realtime.stream.StreamConfig;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+
+public class KafkaPartitionConsumer extends KafkaConnectionHandler implements PartitionLevelConsumer {
+
+
+    public KafkaPartitionConsumer(StreamConfig streamConfig, int partition) {
+        super(streamConfig, partition);
+    }
+
+    @Override
+    public MessageBatch fetchMessages(long startOffset, long endOffset, int timeoutMillis) throws TimeoutException {
+        _consumer.seek(_topicPartition, startOffset);
+
+        ConsumerRecords<String, byte[]> consumerRecords = _consumer.poll(null);
+        List<ConsumerRecord<String, byte[]>> records = consumerRecords.records(_topicPartition);
+        return new KafkaMessageBatch(records);
+    }
+
+    @Override
+    public void close() throws IOException {
+        super.close();
+    }
+}
diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionLevelStreamConfig.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionLevelStreamConfig.java
new file mode 100644
index 0000000..c154a38
--- /dev/null
+++ b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionLevelStreamConfig.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.kafka2;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang.StringUtils;
+import org.apache.pinot.common.utils.EqualityUtils;
+import org.apache.pinot.core.realtime.stream.StreamConfig;
+
+import java.util.Map;
+import java.util.Properties;
+
+public class KafkaPartitionLevelStreamConfig {
+
+    private final String _kafkaTopicName;
+    private final String _bootstrapHosts;
+    private final int _kafkaBufferSize;
+    private final int _kafkaSocketTimeout;
+    private final int _kafkaFetcherSizeBytes;
+    private final int _kafkaFetcherMinBytes;
+    private final Map<String, String> _streamConfigMap;
+
+    /**
+     * Builds a wrapper around {@link StreamConfig} to fetch kafka partition level consumer related configs
+     * @param streamConfig
+     */
+    public KafkaPartitionLevelStreamConfig(StreamConfig streamConfig) {
+        _streamConfigMap = streamConfig.getStreamConfigsMap();
+
+        _kafkaTopicName = streamConfig.getTopicName();
+
+        String llcBrokerListKey = KafkaStreamConfigProperties
+                .constructStreamProperty(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_BROKER_LIST);
+        String llcBufferKey = KafkaStreamConfigProperties
+                .constructStreamProperty(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE);
+        String llcTimeoutKey = KafkaStreamConfigProperties
+                .constructStreamProperty(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_SOCKET_TIMEOUT);
+        String fetcherSizeKey = KafkaStreamConfigProperties
+                .constructStreamProperty(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_FETCHER_SIZE_BYTES);
+        String fetcherMinBytesKey = KafkaStreamConfigProperties
+                .constructStreamProperty(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_FETCHER_MIN_BYTES);
+        _bootstrapHosts = _streamConfigMap.get(llcBrokerListKey);
+        _kafkaBufferSize = getIntConfigWithDefault(_streamConfigMap, llcBufferKey,
+                KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE_DEFAULT);
+        _kafkaSocketTimeout = getIntConfigWithDefault(_streamConfigMap, llcTimeoutKey,
+                KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_SOCKET_TIMEOUT_DEFAULT);
+        _kafkaFetcherSizeBytes = getIntConfigWithDefault(_streamConfigMap, fetcherSizeKey, _kafkaBufferSize);
+        _kafkaFetcherMinBytes = getIntConfigWithDefault(_streamConfigMap, fetcherMinBytesKey,
+                KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_FETCHER_MIN_BYTES_DEFAULT);
+        Preconditions.checkNotNull(_bootstrapHosts,
+                "Must specify kafka brokers list " + llcBrokerListKey + " in case of low level kafka consumer");
+    }
+
+    public String getKafkaTopicName() {
+        return _kafkaTopicName;
+    }
+
+    public String getBootstrapHosts() {
+        return _bootstrapHosts;
+    }
+
+    public int getKafkaBufferSize() {
+        return _kafkaBufferSize;
+    }
+
+    public int getKafkaSocketTimeout() {
+        return _kafkaSocketTimeout;
+    }
+
+    public int getKafkaFetcherSizeBytes() {
+        return _kafkaFetcherSizeBytes;
+    }
+
+    public int getKafkaFetcherMinBytes() {
+        return _kafkaFetcherMinBytes;
+    }
+
+    private int getIntConfigWithDefault(Map<String, String> configMap, String key, int defaultValue) {
+        String stringValue = configMap.get(key);
+        try {
+            if (StringUtils.isNotEmpty(stringValue)) {
+                return Integer.parseInt(stringValue);
+            }
+            return defaultValue;
+        } catch (NumberFormatException ex) {
+            return defaultValue;
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "KafkaLowLevelStreamConfig{" + "_kafkaTopicName='" + _kafkaTopicName + '\'' + ", _bootstrapHosts='"
+                + _bootstrapHosts + '\'' + ", _kafkaBufferSize='" + _kafkaBufferSize + '\'' + ", _kafkaSocketTimeout='"
+                + _kafkaSocketTimeout + '\'' + ", _kafkaFetcherSizeBytes='" + _kafkaFetcherSizeBytes + '\'' + ", _kafkaFetcherMinBytes='"
+                + _kafkaFetcherMinBytes + '\'' + '}';
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (EqualityUtils.isSameReference(this, o)) {
+            return true;
+        }
+
+        if (EqualityUtils.isNullOrNotSameClass(this, o)) {
+            return false;
+        }
+
+        KafkaPartitionLevelStreamConfig that = (KafkaPartitionLevelStreamConfig) o;
+
+        return EqualityUtils.isEqual(_kafkaTopicName, that._kafkaTopicName) && EqualityUtils
+                .isEqual(_bootstrapHosts, that._bootstrapHosts) && EqualityUtils
+                .isEqual(_kafkaBufferSize, that._kafkaBufferSize) && EqualityUtils
+                .isEqual(_kafkaSocketTimeout, that._kafkaSocketTimeout) && EqualityUtils
+                .isEqual(_kafkaFetcherSizeBytes, that._kafkaFetcherSizeBytes) && EqualityUtils
+                .isEqual(_kafkaFetcherMinBytes, that._kafkaFetcherMinBytes);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = EqualityUtils.hashCodeOf(_kafkaTopicName);
+        result = EqualityUtils.hashCodeOf(result, _bootstrapHosts);
+        result = EqualityUtils.hashCodeOf(result, _kafkaBufferSize);
+        result = EqualityUtils.hashCodeOf(result, _kafkaSocketTimeout);
+        result = EqualityUtils.hashCodeOf(result, _kafkaFetcherSizeBytes);
+        result = EqualityUtils.hashCodeOf(result, _kafkaFetcherMinBytes);
+        return result;
+    }
+}
diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaStreamConfigProperties.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaStreamConfigProperties.java
new file mode 100644
index 0000000..3c45d6e
--- /dev/null
+++ b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaStreamConfigProperties.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.kafka2;
+
+import com.google.common.base.Joiner;
+import org.apache.pinot.core.realtime.stream.StreamConfigProperties;
+
+
+/**
+ * Property key definitions for all kafka stream related properties
+ */
+public class KafkaStreamConfigProperties {
+  public static final String DOT_SEPARATOR = ".";
+  public static final String STREAM_TYPE = "kafka";
+
+  public static class HighLevelConsumer {
+    public static final String KAFKA_HLC_ZK_CONNECTION_STRING = "kafka.hlc.zk.connect.string";
+    public static final String ZK_SESSION_TIMEOUT_MS = "zookeeper.session.timeout.ms";
+    public static final String ZK_CONNECTION_TIMEOUT_MS = "zookeeper.connection.timeout.ms";
+    public static final String ZK_SYNC_TIME_MS = "zookeeper.sync.time.ms";
+    public static final String REBALANCE_MAX_RETRIES = "rebalance.max.retries";
+    public static final String REBALANCE_BACKOFF_MS = "rebalance.backoff.ms";
+    public static final String AUTO_COMMIT_ENABLE = "auto.commit.enable";
+    public static final String AUTO_OFFSET_RESET = "auto.offset.reset";
+  }
+
+  public static class LowLevelConsumer {
+    public static final String KAFKA_BROKER_LIST = "kafka.broker.list";
+    public static final String KAFKA_BUFFER_SIZE = "kafka.buffer.size";
+    public static final int KAFKA_BUFFER_SIZE_DEFAULT = 512000;
+    public static final String KAFKA_SOCKET_TIMEOUT = "kafka.socket.timeout";
+    public static final int KAFKA_SOCKET_TIMEOUT_DEFAULT = 10000;
+    public static final String KAFKA_FETCHER_SIZE_BYTES = "kafka.fetcher.size";
+    public static final String KAFKA_FETCHER_MIN_BYTES = "kafka.fetcher.minBytes";
+    public static final int KAFKA_FETCHER_MIN_BYTES_DEFAULT = 100000;
+  }
+
+  public static final String KAFKA_CONSUMER_PROP_PREFIX = "kafka.consumer.prop";
+
+  /**
+   * Helper method to create a property string for kafka stream
+   * @param property
+   * @return
+   */
+  public static String constructStreamProperty(String property) {
+    return Joiner.on(DOT_SEPARATOR).join(StreamConfigProperties.STREAM_PREFIX, property);
+  }
+}
+
diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaStreamMetadataProvider.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaStreamMetadataProvider.java
new file mode 100644
index 0000000..3871d85
--- /dev/null
+++ b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaStreamMetadataProvider.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.kafka2;
+
+import com.google.common.base.Preconditions;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.DescribeTopicsResult;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.pinot.core.realtime.stream.OffsetCriteria;
+import org.apache.pinot.core.realtime.stream.StreamConfig;
+import org.apache.pinot.core.realtime.stream.StreamMetadataProvider;
+
+import javax.annotation.Nonnull;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+
+public class KafkaStreamMetadataProvider extends KafkaConnectionHandler implements StreamMetadataProvider {
+
+    private AdminClient _adminClient;
+
+    public KafkaStreamMetadataProvider(StreamConfig streamConfig, int partition) {
+        super(streamConfig, partition);
+        final Properties props = new Properties();
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, _config.getBootstrapHosts());
+        _adminClient = AdminClient.create(props);
+    }
+
+    @Override
+    public int fetchPartitionCount(long timeoutMillis) {
+        DescribeTopicsResult result = _adminClient.describeTopics(Collections.singletonList(_config.getKafkaTopicName()));
+        Map<String, KafkaFuture<TopicDescription>> values = result.values();
+        KafkaFuture<TopicDescription> topicDescription = values.get(_config.getKafkaTopicName());
+        try {
+            return topicDescription.get().partitions().size();
+        } catch (InterruptedException | ExecutionException e) {
+            throw new RuntimeException("");
+        }
+    }
+
+    @Override
+    public long fetchPartitionOffset(@Nonnull OffsetCriteria offsetCriteria, long timeoutMillis) throws TimeoutException {
+
+        Preconditions.checkNotNull(offsetCriteria);
+        if (offsetCriteria.isLargest()) {
+            return _consumer.endOffsets(Collections.singletonList(_topicPartition), Duration.ofMillis(timeoutMillis)).get(_topicPartition);
+        } else if (offsetCriteria.isSmallest()) {
+            return _consumer.beginningOffsets(Collections.singletonList(_topicPartition), Duration.ofMillis(timeoutMillis)).get(_topicPartition);
+        } else {
+            throw new IllegalArgumentException("Unknown initial offset value " + offsetCriteria.toString());
+        }
+
+    }
+
+    @Override
+    public void close() throws IOException {
+        super.close();
+    }
+}
diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/MessageAndOffset.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/MessageAndOffset.java
new file mode 100644
index 0000000..0dea267
--- /dev/null
+++ b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/MessageAndOffset.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.kafka2;
+
+import java.nio.ByteBuffer;
+
+public class MessageAndOffset {
+
+    private ByteBuffer _message;
+    private long _offset;
+
+    public MessageAndOffset(byte[] message, long offset) {
+        _message = ByteBuffer.wrap(message);
+        _offset = offset;
+    }
+
+    public MessageAndOffset(ByteBuffer message, long offset) {
+        _message = message;
+        _offset = offset;
+    }
+
+    public ByteBuffer getMessage() {
+        return _message;
+    }
+
+    public long getOffset() {
+        return _offset;
+    }
+
+    public long getNextOffset() {
+        return _offset + 1;
+    }
+}
diff --git a/pinot-connectors/pom.xml b/pinot-connectors/pom.xml
index 3695189..64d798d 100644
--- a/pinot-connectors/pom.xml
+++ b/pinot-connectors/pom.xml
@@ -32,12 +32,14 @@
   <artifactId>pinot-connectors</artifactId>
   <packaging>pom</packaging>
   <name>Pinot Connectors</name>
+  <url>https://pinot.apache.org/</url>
   <properties>
     <pinot.root>${basedir}/..</pinot.root>
   </properties>
 
   <modules>
     <module>pinot-connector-kafka-0.9</module>
+    <module>pinot-connector-kafka-2.0</module>
   </modules>
 
   <dependencies>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 02/02: Adding support for Kafka 2.0

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch kafka_2.0
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit d9e031618c4c7fa28e64d62858aa7e4a36d6f279
Author: Xiang Fu <xi...@traceable.ai>
AuthorDate: Mon Jul 1 16:17:25 2019 -0700

    Adding support for Kafka 2.0
---
 pinot-connectors/pinot-connector-kafka-0.9/pom.xml |   5 +
 pinot-connectors/pinot-connector-kafka-2.0/pom.xml | 112 +++++---
 ...umerFactory.java => Kafka2ConsumerFactory.java} |  36 +--
 .../impl/kafka2/Kafka2ConsumerManager.java         | 191 ++++++++++++++
 .../impl/kafka2/Kafka2HighLevelStreamConfig.java   | 135 ++++++++++
 .../realtime/impl/kafka2/Kafka2MessageBatch.java   |  61 +++++
 .../Kafka2PartitionLevelConnectionHandler.java     |  67 +++++
 ...Kafka2PartitionLevelPartitionLevelConsumer.java |  65 +++++
 .../kafka2/Kafka2PartitionLevelStreamConfig.java   | 146 +++++++++++
 ...Kafka2PartitionLevelStreamMetadataProvider.java |  67 +++++
 ...ties.java => Kafka2StreamConfigProperties.java} |  32 +--
 .../impl/kafka2/Kafka2StreamLevelConsumer.java     | 166 ++++++++++++
 .../impl/kafka2/KafkaAvroMessageDecoder.java       | 290 +++++++++++++++++++++
 .../impl/kafka2/KafkaConnectionHandler.java        |  61 -----
 .../impl/kafka2/KafkaJSONMessageDecoder.java       |  63 +++++
 .../realtime/impl/kafka2/KafkaMessageBatch.java    |  65 -----
 .../impl/kafka2/KafkaPartitionConsumer.java        |  51 ----
 .../kafka2/KafkaPartitionLevelStreamConfig.java    | 144 ----------
 .../impl/kafka2/KafkaStreamMetadataProvider.java   |  81 ------
 .../realtime/impl/kafka2/MessageAndOffset.java     |  42 +--
 .../kafka2/KafkaPartitionLevelConsumerTest.java    | 232 +++++++++++++++++
 .../KafkaPartitionLevelStreamConfigTest.java       | 161 ++++++++++++
 .../impl/kafka2/utils/EmbeddedZooKeeper.java       |  60 +++++
 .../impl/kafka2/utils/MiniKafkaCluster.java        | 175 +++++++++++++
 pinot-connectors/pom.xml                           |  12 +
 25 files changed, 2024 insertions(+), 496 deletions(-)

diff --git a/pinot-connectors/pinot-connector-kafka-0.9/pom.xml b/pinot-connectors/pinot-connector-kafka-0.9/pom.xml
index ae0317e..852c29c 100644
--- a/pinot-connectors/pinot-connector-kafka-0.9/pom.xml
+++ b/pinot-connectors/pinot-connector-kafka-0.9/pom.xml
@@ -63,5 +63,10 @@
         </exclusion>
       </exclusions>
     </dependency>
+    <dependency>
+      <groupId>org.scala-lang</groupId>
+      <artifactId>scala-library</artifactId>
+      <version>2.10.5</version>
+    </dependency>
   </dependencies>
 </project>
diff --git a/pinot-connectors/pinot-connector-kafka-2.0/pom.xml b/pinot-connectors/pinot-connector-kafka-2.0/pom.xml
index f351219..2a9c155 100644
--- a/pinot-connectors/pinot-connector-kafka-2.0/pom.xml
+++ b/pinot-connectors/pinot-connector-kafka-2.0/pom.xml
@@ -22,46 +22,82 @@
 <project xmlns="http://maven.apache.org/POM/4.0.0"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <parent>
-        <artifactId>pinot-connectors</artifactId>
-        <groupId>org.apache.pinot</groupId>
-        <version>0.2.0-SNAPSHOT</version>
-        <relativePath>..</relativePath>
-    </parent>
-    <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <artifactId>pinot-connectors</artifactId>
+    <groupId>org.apache.pinot</groupId>
+    <version>0.2.0-SNAPSHOT</version>
+    <relativePath>..</relativePath>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+  <artifactId>pinot-connector-kafka-2.0</artifactId>
+  <name>Pinot Connector Kafka 2.0</name>
+  <url>https://pinot.apache.org/</url>
+  <properties>
+    <pinot.root>${basedir}/../..</pinot.root>
+    <kafka.version>2.0.0</kafka.version>
+  </properties>
 
-    <artifactId>pinot-connector-kafka-2.0</artifactId>
+  <dependencies>
 
-    <properties>
-        <pinot.root>${basedir}/../..</pinot.root>
-        <kafka.version>2.0.0</kafka.version>
-    </properties>
+    <!-- Kafka  -->
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-clients</artifactId>
+      <version>${kafka.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>net.sf.jopt-simple</groupId>
+          <artifactId>jopt-simple</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.scala-lang</groupId>
+          <artifactId>scala-library</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.zookeeper</groupId>
+          <artifactId>zookeeper</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
 
-    <dependencies>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka_2.12</artifactId>
+      <version>${kafka.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>net.sf.jopt-simple</groupId>
+          <artifactId>jopt-simple</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.scala-lang</groupId>
+          <artifactId>scala-library</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.scala-lang</groupId>
+          <artifactId>scala-reflect</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.zookeeper</groupId>
+          <artifactId>zookeeper</artifactId>
+        </exclusion>
+      </exclusions>
+      <scope>test</scope>
+    </dependency>
 
-        <!-- Kafka  -->
-        <dependency>
-            <groupId>org.apache.kafka</groupId>
-            <artifactId>kafka-clients</artifactId>
-            <version>${kafka.version}</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>slf4j-log4j12</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>net.sf.jopt-simple</groupId>
-                    <artifactId>jopt-simple</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.scala-lang</groupId>
-                    <artifactId>scala-library</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.zookeeper</groupId>
-                    <artifactId>zookeeper</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-    </dependencies>
+    <dependency>
+      <groupId>org.scala-lang</groupId>
+      <artifactId>scala-library</artifactId>
+      <version>2.12.8</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
 </project>
\ No newline at end of file
diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaConsumerFactory.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2ConsumerFactory.java
similarity index 57%
rename from pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaConsumerFactory.java
rename to pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2ConsumerFactory.java
index cc3d8a6..3eab517 100644
--- a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaConsumerFactory.java
+++ b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2ConsumerFactory.java
@@ -26,24 +26,26 @@ import org.apache.pinot.core.realtime.stream.StreamConsumerFactory;
 import org.apache.pinot.core.realtime.stream.StreamLevelConsumer;
 import org.apache.pinot.core.realtime.stream.StreamMetadataProvider;
 
-public class KafkaConsumerFactory extends StreamConsumerFactory {
-    @Override
-    public PartitionLevelConsumer createPartitionLevelConsumer(String clientId, int partition) {
-        return new KafkaPartitionConsumer(_streamConfig, partition);
-    }
 
-    @Override
-    public StreamLevelConsumer createStreamLevelConsumer(String clientId, String tableName, Schema schema, InstanceZKMetadata instanceZKMetadata, ServerMetrics serverMetrics) {
-        throw new UnsupportedOperationException("High level consumer not supported in kafka 2. Use Kafka partition level consumers");
-    }
+public class Kafka2ConsumerFactory extends StreamConsumerFactory {
+  @Override
+  public PartitionLevelConsumer createPartitionLevelConsumer(String clientId, int partition) {
+    return new Kafka2PartitionLevelPartitionLevelConsumer(clientId, _streamConfig, partition);
+  }
 
-    @Override
-    public StreamMetadataProvider createPartitionMetadataProvider(String clientId, int partition) {
-        return null;
-    }
+  @Override
+  public StreamLevelConsumer createStreamLevelConsumer(String clientId, String tableName, Schema schema,
+      InstanceZKMetadata instanceZKMetadata, ServerMetrics serverMetrics) {
+    return new Kafka2StreamLevelConsumer(clientId, tableName, _streamConfig, schema, instanceZKMetadata, serverMetrics);
+  }
 
-    @Override
-    public StreamMetadataProvider createStreamMetadataProvider(String clientId) {
-        throw new UnsupportedOperationException("High level consumer not supported in kafka 2. Use Kafka partition level consumers");
-    }
+  @Override
+  public StreamMetadataProvider createPartitionMetadataProvider(String clientId, int partition) {
+    return new Kafka2PartitionLevelStreamMetadataProvider(clientId, _streamConfig, partition);
+  }
+
+  @Override
+  public StreamMetadataProvider createStreamMetadataProvider(String clientId) {
+    return new Kafka2PartitionLevelStreamMetadataProvider(clientId, _streamConfig);
+  }
 }
diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2ConsumerManager.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2ConsumerManager.java
new file mode 100644
index 0000000..74e3ee2
--- /dev/null
+++ b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2ConsumerManager.java
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.kafka2;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.IdentityHashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.tuple.ImmutableTriple;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.BytesDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Manager for Kafka consumers that reuses consumers and delays their shutdown.
+ *
+ * This is a workaround for the current realtime design flaw where any issue while flushing/committing offsets causes
+ * duplicate or dropped events. Kafka consumption is driven by the controller, which assigns a realtime segment to the
+ * servers; when a server is assigned a new realtime segment, it creates a Kafka consumer, consumes until it reaches a
+ * threshold then flushes to disk, writes metadata to helix indicating the segment is completed, commits Kafka offsets
+ * to ZK and then shuts down the consumer. The controller notices the metadata write and reassigns a segment to the
+ * server, so that it can keep on consuming.
+ *
+ * This logic is flawed if committing Kafka offsets fails, at which time the committed state is unknown. The proper fix
+ * would be to just keep on using that consumer and try committing our offsets later, but we recreate a new Kafka
+ * consumer whenever we get a new segment and also keep the old consumer around, leading to half the events being
+ * assigned, due to Kafka rebalancing the partitions between the two consumers (one of which is not actually reading
+ * anything anymore). Because that logic is stateless and driven by Helix, there's no real clean way to keep the
+ * consumer alive and pass it to the next segment.
+ *
+ * This class and long comment is to work around this issue by keeping the consumer alive for a little bit instead of
+ * shutting it down immediately, so that the next segment assignment can pick up the same consumer. This way, even if
+ * committing the offsets fails, we can still pick up the same consumer the next time we get a segment assigned to us
+ * by the controller and hopefully commit our offsets the next time we flush to disk.
+ *
+ * This temporary code should be completely removed by the time we redesign the consumption to use the lower level
+ * Kafka APIs.
+ */
+public class Kafka2ConsumerManager {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(Kafka2ConsumerManager.class);
+  private static final Long IN_USE = -1L;
+  private static final long CONSUMER_SHUTDOWN_DELAY_MILLIS = TimeUnit.SECONDS.toMillis(60); // One minute
+  private static final Map<ImmutableTriple<String, String, String>, KafkaConsumer> CONSUMER_FOR_CONFIG_KEY =
+      new HashMap<>();
+  private static final IdentityHashMap<KafkaConsumer, Long> CONSUMER_RELEASE_TIME = new IdentityHashMap<>();
+
+  public static KafkaConsumer acquireKafkaConsumerForConfig(Kafka2HighLevelStreamConfig kafka2HighLevelStreamConfig) {
+    final ImmutableTriple<String, String, String> configKey =
+        new ImmutableTriple<>(kafka2HighLevelStreamConfig.getKafkaTopicName(), kafka2HighLevelStreamConfig.getGroupId(),
+            kafka2HighLevelStreamConfig.getBootstrapServers());
+
+    synchronized (Kafka2ConsumerManager.class) {
+      // If we have the consumer and it's not already acquired, return it, otherwise error out if it's already acquired
+      if (CONSUMER_FOR_CONFIG_KEY.containsKey(configKey)) {
+        KafkaConsumer kafkaConsumer = CONSUMER_FOR_CONFIG_KEY.get(configKey);
+        if (CONSUMER_RELEASE_TIME.get(kafkaConsumer).equals(IN_USE)) {
+          throw new RuntimeException("Consumer/iterator " + kafkaConsumer + " already in use!");
+        } else {
+          LOGGER.info("Reusing kafka consumer/iterator with id {}", kafkaConsumer);
+          CONSUMER_RELEASE_TIME.put(kafkaConsumer, IN_USE);
+          return kafkaConsumer;
+        }
+      }
+
+      LOGGER.info("Creating new kafka consumer and iterator for topic {}",
+          kafka2HighLevelStreamConfig.getKafkaTopicName());
+
+      // Create the consumer
+
+      Properties consumerProp = new Properties();
+      consumerProp.putAll(kafka2HighLevelStreamConfig.getKafkaConsumerProperties());
+      consumerProp.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka2HighLevelStreamConfig.getBootstrapServers());
+      consumerProp.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+      consumerProp.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class.getName());
+      KafkaConsumer consumer = new KafkaConsumer<>(consumerProp);
+      consumer.subscribe(Collections.singletonList(kafka2HighLevelStreamConfig.getKafkaTopicName()));
+
+      // Mark both the consumer and iterator as acquired
+      CONSUMER_FOR_CONFIG_KEY.put(configKey, consumer);
+      CONSUMER_RELEASE_TIME.put(consumer, IN_USE);
+
+      LOGGER.info("Created consumer/iterator with id {} for topic {}", consumer,
+          kafka2HighLevelStreamConfig.getKafkaTopicName());
+
+      return consumer;
+    }
+  }
+
+  public static void releaseKafkaConsumer(final KafkaConsumer kafkaConsumer) {
+    synchronized (Kafka2ConsumerManager.class) {
+      // Release the consumer, mark it for shutdown in the future
+      final long releaseTime = System.currentTimeMillis() + CONSUMER_SHUTDOWN_DELAY_MILLIS;
+      CONSUMER_RELEASE_TIME.put(kafkaConsumer, releaseTime);
+
+      LOGGER.info("Marking consumer/iterator with id {} for release at {}", kafkaConsumer, releaseTime);
+
+      // Schedule the shutdown of the consumer
+      new Thread() {
+        @Override
+        public void run() {
+          try {
+            // Await the shutdown time
+            Uninterruptibles.sleepUninterruptibly(CONSUMER_SHUTDOWN_DELAY_MILLIS, TimeUnit.MILLISECONDS);
+
+            // Shutdown all consumers that have not been re-acquired
+            synchronized (Kafka2ConsumerManager.class) {
+              LOGGER.info("Executing release check for consumer/iterator {} at {}, scheduled at ", kafkaConsumer,
+                  System.currentTimeMillis(), releaseTime);
+
+              Iterator<Map.Entry<ImmutableTriple<String, String, String>, KafkaConsumer>> configIterator =
+                  CONSUMER_FOR_CONFIG_KEY.entrySet().iterator();
+
+              while (configIterator.hasNext()) {
+                Map.Entry<ImmutableTriple<String, String, String>, KafkaConsumer> entry = configIterator.next();
+                KafkaConsumer kafkaConsumer = entry.getValue();
+
+                final Long releaseTime = CONSUMER_RELEASE_TIME.get(kafkaConsumer);
+                if (!releaseTime.equals(IN_USE) && releaseTime < System.currentTimeMillis()) {
+                  LOGGER.info("Releasing consumer/iterator {}", kafkaConsumer);
+
+                  try {
+                    kafkaConsumer.close();
+                  } catch (Exception e) {
+                    LOGGER.warn("Caught exception while shutting down Kafka consumer with id {}", kafkaConsumer, e);
+                  }
+
+                  configIterator.remove();
+                  CONSUMER_RELEASE_TIME.remove(kafkaConsumer);
+                } else {
+                  LOGGER.info("Not releasing consumer/iterator {}, it has been reacquired", kafkaConsumer);
+                }
+              }
+            }
+          } catch (Exception e) {
+            LOGGER.warn("Caught exception in release of consumer/iterator {}", e, kafkaConsumer);
+          }
+        }
+      }.start();
+    }
+  }
+
+  public static void closeAllConsumers() {
+    try {
+      // Shutdown all consumers
+      synchronized (Kafka2ConsumerManager.class) {
+        LOGGER.info("Trying to shutdown all the kafka consumers");
+        Iterator<KafkaConsumer> consumerIterator = CONSUMER_FOR_CONFIG_KEY.values().iterator();
+
+        while (consumerIterator.hasNext()) {
+          KafkaConsumer kafkaConsumer = consumerIterator.next();
+          LOGGER.info("Trying to shutdown consumer/iterator {}", kafkaConsumer);
+          try {
+            kafkaConsumer.close();
+          } catch (Exception e) {
+            LOGGER.warn("Caught exception while shutting down Kafka consumer with id {}", kafkaConsumer, e);
+          }
+          consumerIterator.remove();
+        }
+        CONSUMER_FOR_CONFIG_KEY.clear();
+        CONSUMER_RELEASE_TIME.clear();
+      }
+    } catch (Exception e) {
+      LOGGER.warn("Caught exception during shutting down all kafka consumers", e);
+    }
+  }
+}
diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2HighLevelStreamConfig.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2HighLevelStreamConfig.java
new file mode 100644
index 0000000..f866288
--- /dev/null
+++ b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2HighLevelStreamConfig.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.kafka2;
+
+import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
+import org.apache.pinot.common.utils.EqualityUtils;
+import org.apache.pinot.core.realtime.stream.StreamConfig;
+import org.apache.pinot.core.realtime.stream.StreamConfigProperties;
+
+
+/**
+ * Wrapper around {@link StreamConfig} for use in the {@link Kafka2StreamLevelConsumer}
+ */
+public class Kafka2HighLevelStreamConfig {
+  private static final String DEFAULT_AUTO_COMMIT_ENABLE = "false";
+
+  private static final Map<String, String> defaultProps;
+  private String _kafkaTopicName;
+  private String _groupId;
+  private String _bootstrapServers;
+  private Map<String, String> _kafkaConsumerProperties;
+
+  /**
+   * Builds a wrapper around {@link StreamConfig} to fetch kafka stream level consumer specific configs
+   * @param streamConfig
+   * @param tableName
+   * @param instanceZKMetadata
+   */
+  public Kafka2HighLevelStreamConfig(StreamConfig streamConfig, String tableName,
+      InstanceZKMetadata instanceZKMetadata) {
+    Map<String, String> streamConfigMap = streamConfig.getStreamConfigsMap();
+
+    _kafkaTopicName = streamConfig.getTopicName();
+    String hlcBootstrapBrokerUrlKey = Kafka2StreamConfigProperties
+        .constructStreamProperty(Kafka2StreamConfigProperties.HighLevelConsumer.KAFKA_HLC_BOOTSTRAP_SERVER);
+    _bootstrapServers = streamConfigMap.get(hlcBootstrapBrokerUrlKey);
+    Preconditions.checkNotNull(_bootstrapServers,
+        "Must specify bootstrap broker connect string " + hlcBootstrapBrokerUrlKey + " in high level kafka consumer");
+    _groupId = instanceZKMetadata.getGroupId(tableName);
+
+    _kafkaConsumerProperties = new HashMap<>();
+    String kafkaConsumerPropertyPrefix =
+        Kafka2StreamConfigProperties.constructStreamProperty(Kafka2StreamConfigProperties.KAFKA_CONSUMER_PROP_PREFIX);
+    for (String key : streamConfigMap.keySet()) {
+      if (key.startsWith(kafkaConsumerPropertyPrefix)) {
+        _kafkaConsumerProperties
+            .put(StreamConfigProperties.getPropertySuffix(key, kafkaConsumerPropertyPrefix), streamConfigMap.get(key));
+      }
+    }
+  }
+
+  public String getKafkaTopicName() {
+    return _kafkaTopicName;
+  }
+
+  public String getGroupId() {
+    return _groupId;
+  }
+
+  public Properties getKafkaConsumerProperties() {
+    Properties props = new Properties();
+    for (String key : defaultProps.keySet()) {
+      props.put(key, defaultProps.get(key));
+    }
+    for (String key : _kafkaConsumerProperties.keySet()) {
+      props.put(key, _kafkaConsumerProperties.get(key));
+    }
+    props.put("group.id", _groupId);
+    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, _bootstrapServers);
+    return props;
+  }
+
+  @Override
+  public String toString() {
+    return "Kafka2HighLevelStreamConfig{" + "_kafkaTopicName='" + _kafkaTopicName + '\'' + ", _groupId='" + _groupId
+        + '\'' + ", _bootstrapServers='" + _bootstrapServers + '\'' + ", _kafkaConsumerProperties="
+        + _kafkaConsumerProperties + '}';
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (EqualityUtils.isSameReference(this, o)) {
+      return true;
+    }
+
+    if (EqualityUtils.isNullOrNotSameClass(this, o)) {
+      return false;
+    }
+
+    Kafka2HighLevelStreamConfig that = (Kafka2HighLevelStreamConfig) o;
+
+    return EqualityUtils.isEqual(_kafkaTopicName, that._kafkaTopicName) && EqualityUtils
+        .isEqual(_groupId, that._groupId) && EqualityUtils.isEqual(_bootstrapServers, that._bootstrapServers)
+        && EqualityUtils.isEqual(_kafkaConsumerProperties, that._kafkaConsumerProperties);
+  }
+
+  @Override
+  public int hashCode() {
+    int result = EqualityUtils.hashCodeOf(_kafkaTopicName);
+    result = EqualityUtils.hashCodeOf(result, _groupId);
+    result = EqualityUtils.hashCodeOf(result, _bootstrapServers);
+    result = EqualityUtils.hashCodeOf(result, _kafkaConsumerProperties);
+    return result;
+  }
+
+  public String getBootstrapServers() {
+    return _bootstrapServers;
+  }
+
+  static {
+    defaultProps = new HashMap<>();
+    defaultProps.put(Kafka2StreamConfigProperties.HighLevelConsumer.AUTO_COMMIT_ENABLE, DEFAULT_AUTO_COMMIT_ENABLE);
+  }
+}
diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2MessageBatch.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2MessageBatch.java
new file mode 100644
index 0000000..13bd41b
--- /dev/null
+++ b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2MessageBatch.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.kafka2;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.pinot.core.realtime.stream.MessageBatch;
+
+
+public class Kafka2MessageBatch implements MessageBatch<byte[]> {
+
+  private List<MessageAndOffset> messageList = new ArrayList<>();
+
+  public Kafka2MessageBatch(Iterable<ConsumerRecord<String, byte[]>> iterable) {
+    for (ConsumerRecord<String, byte[]> record : iterable) {
+      messageList.add(new MessageAndOffset(record.value(), record.offset()));
+    }
+  }
+
+  @Override
+  public int getMessageCount() {
+    return messageList.size();
+  }
+
+  @Override
+  public byte[] getMessageAtIndex(int index) {
+    return messageList.get(index).getMessage().array();
+  }
+
+  @Override
+  public int getMessageOffsetAtIndex(int index) {
+    return messageList.get(index).getMessage().arrayOffset();
+  }
+
+  @Override
+  public int getMessageLengthAtIndex(int index) {
+    return messageList.get(index).payloadSize();
+  }
+
+  @Override
+  public long getNextStreamMessageOffsetAtIndex(int index) {
+    return messageList.get(index).getNextOffset();
+  }
+}
diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2PartitionLevelConnectionHandler.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2PartitionLevelConnectionHandler.java
new file mode 100644
index 0000000..3f2550d
--- /dev/null
+++ b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2PartitionLevelConnectionHandler.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.kafka2;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Properties;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.BytesDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.pinot.core.realtime.stream.StreamConfig;
+
+
+public abstract class Kafka2PartitionLevelConnectionHandler {
+
+  protected final Kafka2PartitionLevelStreamConfig _config;
+  protected final String _clientId;
+  protected final int _partition;
+  protected final String _topic;
+  protected final Consumer<String, byte[]> _consumer;
+  protected final TopicPartition _topicPartition;
+
+  public Kafka2PartitionLevelConnectionHandler(String clientId, StreamConfig streamConfig, int partition) {
+    _config = new Kafka2PartitionLevelStreamConfig(streamConfig);
+    _clientId = clientId;
+    _partition = partition;
+    _topic = _config.getKafkaTopicName();
+    Properties consumerProp = new Properties();
+    consumerProp.putAll(streamConfig.getStreamConfigsMap());
+    consumerProp.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, _config.getBootstrapHosts());
+    consumerProp.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+    consumerProp.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class.getName());
+    _consumer = new KafkaConsumer<>(consumerProp);
+    _topicPartition = new TopicPartition(_topic, _partition);
+    _consumer.assign(Collections.singletonList(_topicPartition));
+  }
+
+  public void close()
+      throws IOException {
+    _consumer.close();
+  }
+
+  @VisibleForTesting
+  protected Kafka2PartitionLevelStreamConfig getKafka2PartitionLevelStreamConfig() {
+    return _config;
+  }
+}
diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2PartitionLevelPartitionLevelConsumer.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2PartitionLevelPartitionLevelConsumer.java
new file mode 100644
index 0000000..19f520a
--- /dev/null
+++ b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2PartitionLevelPartitionLevelConsumer.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.kafka2;
+
+import com.google.common.collect.Iterables;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.pinot.core.realtime.stream.MessageBatch;
+import org.apache.pinot.core.realtime.stream.PartitionLevelConsumer;
+import org.apache.pinot.core.realtime.stream.StreamConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class Kafka2PartitionLevelPartitionLevelConsumer extends Kafka2PartitionLevelConnectionHandler implements PartitionLevelConsumer {
+  private static final Logger LOGGER = LoggerFactory.getLogger(Kafka2PartitionLevelPartitionLevelConsumer.class);
+
+  public Kafka2PartitionLevelPartitionLevelConsumer(String clientId, StreamConfig streamConfig, int partition) {
+    super(clientId, streamConfig, partition);
+  }
+
+  @Override
+  public MessageBatch fetchMessages(long startOffset, long endOffset, int timeoutMillis)
+      throws TimeoutException {
+    _consumer.seek(_topicPartition, startOffset);
+    ConsumerRecords<String, byte[]> consumerRecords = _consumer.poll(Duration.ofMillis(timeoutMillis));
+    final Iterable<ConsumerRecord<String, byte[]>> messageAndOffsetIterable =
+        buildOffsetFilteringIterable(consumerRecords.records(_topicPartition), startOffset, endOffset);
+    return new Kafka2MessageBatch(messageAndOffsetIterable);
+  }
+
+  private Iterable<ConsumerRecord<String, byte[]>> buildOffsetFilteringIterable(
+      final List<ConsumerRecord<String, byte[]>> messageAndOffsets, final long startOffset, final long endOffset) {
+    return Iterables.filter(messageAndOffsets, input -> {
+      // Filter messages that are either null or have an offset ∉ [startOffset, endOffset]
+      return input != null && input.offset() >= startOffset && (endOffset > input.offset() || endOffset == -1);
+    });
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    super.close();
+  }
+}
diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2PartitionLevelStreamConfig.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2PartitionLevelStreamConfig.java
new file mode 100644
index 0000000..fcc0e04
--- /dev/null
+++ b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2PartitionLevelStreamConfig.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.kafka2;
+
+import com.google.common.base.Preconditions;
+import java.util.Map;
+import org.apache.commons.lang.StringUtils;
+import org.apache.pinot.common.utils.EqualityUtils;
+import org.apache.pinot.core.realtime.stream.StreamConfig;
+
+
+/**
+ * Wrapper around {@link StreamConfig} for use in {@link Kafka2PartitionLevelPartitionLevelConsumer}
+ */
+public class Kafka2PartitionLevelStreamConfig {
+
+  private final String _kafkaTopicName;
+  private final String _bootstrapHosts;
+  private final int _kafkaBufferSize;
+  private final int _kafkaSocketTimeout;
+  private final int _kafkaFetcherSizeBytes;
+  private final int _kafkaFetcherMinBytes;
+  private final Map<String, String> _streamConfigMap;
+
+  /**
+   * Builds a wrapper around {@link StreamConfig} to fetch kafka partition level consumer related configs
+   * @param streamConfig
+   */
+  public Kafka2PartitionLevelStreamConfig(StreamConfig streamConfig) {
+    _streamConfigMap = streamConfig.getStreamConfigsMap();
+
+    _kafkaTopicName = streamConfig.getTopicName();
+
+    String llcBrokerListKey = Kafka2StreamConfigProperties
+        .constructStreamProperty(Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_BROKER_LIST);
+    String llcBufferKey = Kafka2StreamConfigProperties
+        .constructStreamProperty(Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE);
+    String llcTimeoutKey = Kafka2StreamConfigProperties
+        .constructStreamProperty(Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_SOCKET_TIMEOUT);
+    String fetcherSizeKey = Kafka2StreamConfigProperties
+        .constructStreamProperty(Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_FETCHER_SIZE_BYTES);
+    String fetcherMinBytesKey = Kafka2StreamConfigProperties
+        .constructStreamProperty(Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_FETCHER_MIN_BYTES);
+    _bootstrapHosts = _streamConfigMap.get(llcBrokerListKey);
+    _kafkaBufferSize = getIntConfigWithDefault(_streamConfigMap, llcBufferKey,
+        Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE_DEFAULT);
+    _kafkaSocketTimeout = getIntConfigWithDefault(_streamConfigMap, llcTimeoutKey,
+        Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_SOCKET_TIMEOUT_DEFAULT);
+    _kafkaFetcherSizeBytes = getIntConfigWithDefault(_streamConfigMap, fetcherSizeKey, _kafkaBufferSize);
+    _kafkaFetcherMinBytes = getIntConfigWithDefault(_streamConfigMap, fetcherMinBytesKey,
+        Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_FETCHER_MIN_BYTES_DEFAULT);
+    Preconditions.checkNotNull(_bootstrapHosts,
+        "Must specify kafka brokers list " + llcBrokerListKey + " in case of low level kafka consumer");
+  }
+
+  public String getKafkaTopicName() {
+    return _kafkaTopicName;
+  }
+
+  public String getBootstrapHosts() {
+    return _bootstrapHosts;
+  }
+
+  public int getKafkaBufferSize() {
+    return _kafkaBufferSize;
+  }
+
+  public int getKafkaSocketTimeout() {
+    return _kafkaSocketTimeout;
+  }
+
+  public int getKafkaFetcherSizeBytes() {
+    return _kafkaFetcherSizeBytes;
+  }
+
+  public int getKafkaFetcherMinBytes() {
+    return _kafkaFetcherMinBytes;
+  }
+
+  private int getIntConfigWithDefault(Map<String, String> configMap, String key, int defaultValue) {
+    String stringValue = configMap.get(key);
+    try {
+      if (StringUtils.isNotEmpty(stringValue)) {
+        return Integer.parseInt(stringValue);
+      }
+      return defaultValue;
+    } catch (NumberFormatException ex) {
+      return defaultValue;
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "KafkaLowLevelStreamConfig{" + "_kafkaTopicName='" + _kafkaTopicName + '\'' + ", _bootstrapHosts='"
+        + _bootstrapHosts + '\'' + ", _kafkaBufferSize='" + _kafkaBufferSize + '\'' + ", _kafkaSocketTimeout='"
+        + _kafkaSocketTimeout + '\'' + ", _kafkaFetcherSizeBytes='" + _kafkaFetcherSizeBytes + '\''
+        + ", _kafkaFetcherMinBytes='" + _kafkaFetcherMinBytes + '\'' + '}';
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (EqualityUtils.isSameReference(this, o)) {
+      return true;
+    }
+
+    if (EqualityUtils.isNullOrNotSameClass(this, o)) {
+      return false;
+    }
+
+    Kafka2PartitionLevelStreamConfig that = (Kafka2PartitionLevelStreamConfig) o;
+
+    return EqualityUtils.isEqual(_kafkaTopicName, that._kafkaTopicName) && EqualityUtils
+        .isEqual(_bootstrapHosts, that._bootstrapHosts) && EqualityUtils
+        .isEqual(_kafkaBufferSize, that._kafkaBufferSize) && EqualityUtils
+        .isEqual(_kafkaSocketTimeout, that._kafkaSocketTimeout) && EqualityUtils
+        .isEqual(_kafkaFetcherSizeBytes, that._kafkaFetcherSizeBytes) && EqualityUtils
+        .isEqual(_kafkaFetcherMinBytes, that._kafkaFetcherMinBytes);
+  }
+
+  @Override
+  public int hashCode() {
+    int result = EqualityUtils.hashCodeOf(_kafkaTopicName);
+    result = EqualityUtils.hashCodeOf(result, _bootstrapHosts);
+    result = EqualityUtils.hashCodeOf(result, _kafkaBufferSize);
+    result = EqualityUtils.hashCodeOf(result, _kafkaSocketTimeout);
+    result = EqualityUtils.hashCodeOf(result, _kafkaFetcherSizeBytes);
+    result = EqualityUtils.hashCodeOf(result, _kafkaFetcherMinBytes);
+    return result;
+  }
+}
diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2PartitionLevelStreamMetadataProvider.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2PartitionLevelStreamMetadataProvider.java
new file mode 100644
index 0000000..7a0558d
--- /dev/null
+++ b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2PartitionLevelStreamMetadataProvider.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.kafka2;
+
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.concurrent.TimeoutException;
+import javax.annotation.Nonnull;
+import org.apache.pinot.core.realtime.stream.OffsetCriteria;
+import org.apache.pinot.core.realtime.stream.StreamConfig;
+import org.apache.pinot.core.realtime.stream.StreamMetadataProvider;
+
+
+public class Kafka2PartitionLevelStreamMetadataProvider extends Kafka2PartitionLevelConnectionHandler implements StreamMetadataProvider {
+
+  public Kafka2PartitionLevelStreamMetadataProvider(String clientId, StreamConfig streamConfig) {
+    this(clientId, streamConfig, Integer.MIN_VALUE);
+  }
+
+  public Kafka2PartitionLevelStreamMetadataProvider(String clientId, StreamConfig streamConfig, int partition) {
+    super(clientId, streamConfig, partition);
+  }
+
+  @Override
+  public int fetchPartitionCount(long timeoutMillis) {
+    return _consumer.partitionsFor(_topic, Duration.ofMillis(timeoutMillis)).size();
+  }
+
+  @Override
+  public long fetchPartitionOffset(@Nonnull OffsetCriteria offsetCriteria, long timeoutMillis)
+      throws TimeoutException {
+    Preconditions.checkNotNull(offsetCriteria);
+    if (offsetCriteria.isLargest()) {
+      return _consumer.endOffsets(Collections.singletonList(_topicPartition), Duration.ofMillis(timeoutMillis))
+          .get(_topicPartition);
+    } else if (offsetCriteria.isSmallest()) {
+      return _consumer.beginningOffsets(Collections.singletonList(_topicPartition), Duration.ofMillis(timeoutMillis))
+          .get(_topicPartition);
+    } else {
+      throw new IllegalArgumentException("Unknown initial offset value " + offsetCriteria.toString());
+    }
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    super.close();
+  }
+}
diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaStreamConfigProperties.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2StreamConfigProperties.java
similarity index 76%
rename from pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaStreamConfigProperties.java
rename to pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2StreamConfigProperties.java
index 3c45d6e..ed27dfc 100644
--- a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaStreamConfigProperties.java
+++ b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2StreamConfigProperties.java
@@ -25,19 +25,22 @@ import org.apache.pinot.core.realtime.stream.StreamConfigProperties;
 /**
  * Property key definitions for all kafka stream related properties
  */
-public class KafkaStreamConfigProperties {
+public class Kafka2StreamConfigProperties {
   public static final String DOT_SEPARATOR = ".";
-  public static final String STREAM_TYPE = "kafka";
+  public static final String KAFKA_CONSUMER_PROP_PREFIX = "kafka.consumer.prop";
+
+  /**
+   * Helper method to create a property string for kafka stream
+   * @param property
+   * @return
+   */
+  public static String constructStreamProperty(String property) {
+    return Joiner.on(DOT_SEPARATOR).join(StreamConfigProperties.STREAM_PREFIX, property);
+  }
 
   public static class HighLevelConsumer {
-    public static final String KAFKA_HLC_ZK_CONNECTION_STRING = "kafka.hlc.zk.connect.string";
-    public static final String ZK_SESSION_TIMEOUT_MS = "zookeeper.session.timeout.ms";
-    public static final String ZK_CONNECTION_TIMEOUT_MS = "zookeeper.connection.timeout.ms";
-    public static final String ZK_SYNC_TIME_MS = "zookeeper.sync.time.ms";
-    public static final String REBALANCE_MAX_RETRIES = "rebalance.max.retries";
-    public static final String REBALANCE_BACKOFF_MS = "rebalance.backoff.ms";
+    public static final String KAFKA_HLC_BOOTSTRAP_SERVER = "kafka.hlc.bootstrap.server";
     public static final String AUTO_COMMIT_ENABLE = "auto.commit.enable";
-    public static final String AUTO_OFFSET_RESET = "auto.offset.reset";
   }
 
   public static class LowLevelConsumer {
@@ -50,16 +53,5 @@ public class KafkaStreamConfigProperties {
     public static final String KAFKA_FETCHER_MIN_BYTES = "kafka.fetcher.minBytes";
     public static final int KAFKA_FETCHER_MIN_BYTES_DEFAULT = 100000;
   }
-
-  public static final String KAFKA_CONSUMER_PROP_PREFIX = "kafka.consumer.prop";
-
-  /**
-   * Helper method to create a property string for kafka stream
-   * @param property
-   * @return
-   */
-  public static String constructStreamProperty(String property) {
-    return Joiner.on(DOT_SEPARATOR).join(StreamConfigProperties.STREAM_PREFIX, property);
-  }
 }
 
diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2StreamLevelConsumer.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2StreamLevelConsumer.java
new file mode 100644
index 0000000..4bbf975
--- /dev/null
+++ b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2StreamLevelConsumer.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.kafka2;
+
+import com.yammer.metrics.core.Meter;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.pinot.common.data.Schema;
+import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
+import org.apache.pinot.common.metrics.ServerMeter;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.core.data.GenericRow;
+import org.apache.pinot.core.realtime.stream.StreamConfig;
+import org.apache.pinot.core.realtime.stream.StreamDecoderProvider;
+import org.apache.pinot.core.realtime.stream.StreamLevelConsumer;
+import org.apache.pinot.core.realtime.stream.StreamMessageDecoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * An implementation of a {@link StreamLevelConsumer} which consumes from the kafka stream
+ */
+public class Kafka2StreamLevelConsumer implements StreamLevelConsumer {
+
+  private StreamMessageDecoder _messageDecoder;
+  private Logger INSTANCE_LOGGER;
+
+  private String _clientId;
+  private String _tableAndStreamName;
+
+  private StreamConfig _streamConfig;
+  private Kafka2HighLevelStreamConfig _kafka2HighLevelStreamConfig;
+
+  private KafkaConsumer<byte[], byte[]> consumer;
+  private ConsumerRecords<byte[], byte[]> consumerRecords;
+  private Iterator<ConsumerRecord<byte[], byte[]>> kafkaIterator;
+  private Map<Integer, Long> consumerOffsets = new HashMap<>(); // tracking current consumed records offsets.
+
+  private long lastLogTime = 0;
+  private long lastCount = 0;
+  private long currentCount = 0L;
+
+  private ServerMetrics _serverMetrics;
+  private Meter tableAndStreamRowsConsumed = null;
+  private Meter tableRowsConsumed = null;
+
+  public Kafka2StreamLevelConsumer(String clientId, String tableName, StreamConfig streamConfig, Schema schema,
+      InstanceZKMetadata instanceZKMetadata, ServerMetrics serverMetrics) {
+    _clientId = clientId;
+    _streamConfig = streamConfig;
+    _kafka2HighLevelStreamConfig = new Kafka2HighLevelStreamConfig(streamConfig, tableName, instanceZKMetadata);
+    _serverMetrics = serverMetrics;
+
+    _messageDecoder = StreamDecoderProvider.create(streamConfig, schema);
+
+    _tableAndStreamName = tableName + "-" + streamConfig.getTopicName();
+    INSTANCE_LOGGER = LoggerFactory
+        .getLogger(Kafka2StreamLevelConsumer.class.getName() + "_" + tableName + "_" + streamConfig.getTopicName());
+  }
+
+  @Override
+  public void start()
+      throws Exception {
+    consumer = Kafka2ConsumerManager.acquireKafkaConsumerForConfig(_kafka2HighLevelStreamConfig);
+  }
+
+  private void updateKafkaIterator() {
+    consumerRecords = consumer.poll(Duration.ofMillis(_streamConfig.getFetchTimeoutMillis()));
+    kafkaIterator = consumerRecords.iterator();
+  }
+
+  @Override
+  public GenericRow next(GenericRow destination) {
+    if (!kafkaIterator.hasNext()) {
+      updateKafkaIterator();
+    }
+    if (kafkaIterator.hasNext()) {
+      try {
+        final ConsumerRecord<byte[], byte[]> record = kafkaIterator.next();
+        updateOffsets(record.partition(), record.offset());
+        destination = _messageDecoder.decode(record.value(), destination);
+        tableAndStreamRowsConsumed = _serverMetrics
+            .addMeteredTableValue(_tableAndStreamName, ServerMeter.REALTIME_ROWS_CONSUMED, 1L,
+                tableAndStreamRowsConsumed);
+        tableRowsConsumed =
+            _serverMetrics.addMeteredGlobalValue(ServerMeter.REALTIME_ROWS_CONSUMED, 1L, tableRowsConsumed);
+
+        ++currentCount;
+
+        final long now = System.currentTimeMillis();
+        // Log every minute or 100k events
+        if (now - lastLogTime > 60000 || currentCount - lastCount >= 100000) {
+          if (lastCount == 0) {
+            INSTANCE_LOGGER.info("Consumed {} events from kafka stream {}", currentCount, _streamConfig.getTopicName());
+          } else {
+            INSTANCE_LOGGER.info("Consumed {} events from kafka stream {} (rate:{}/s)", currentCount - lastCount,
+                _streamConfig.getTopicName(), (float) (currentCount - lastCount) * 1000 / (now - lastLogTime));
+          }
+          lastCount = currentCount;
+          lastLogTime = now;
+        }
+        return destination;
+      } catch (Exception e) {
+        INSTANCE_LOGGER.warn("Caught exception while consuming events", e);
+        _serverMetrics.addMeteredTableValue(_tableAndStreamName, ServerMeter.REALTIME_CONSUMPTION_EXCEPTIONS, 1L);
+        _serverMetrics.addMeteredGlobalValue(ServerMeter.REALTIME_CONSUMPTION_EXCEPTIONS, 1L);
+        throw e;
+      }
+    }
+    return null;
+  }
+
+  private void updateOffsets(int partition, long offset) {
+    consumerOffsets.put(partition, offset);
+  }
+
+  @Override
+  public void commit() {
+    consumer.commitSync(getOffsetsMap());
+    consumerOffsets.clear();
+    _serverMetrics.addMeteredTableValue(_tableAndStreamName, ServerMeter.REALTIME_OFFSET_COMMITS, 1L);
+    _serverMetrics.addMeteredGlobalValue(ServerMeter.REALTIME_OFFSET_COMMITS, 1L);
+  }
+
+  private Map<TopicPartition, OffsetAndMetadata> getOffsetsMap() {
+    Map<TopicPartition, OffsetAndMetadata> offsetsMap = new HashMap<>();
+    for (Integer partition : consumerOffsets.keySet()) {
+      offsetsMap.put(new TopicPartition(_streamConfig.getTopicName(), partition),
+          new OffsetAndMetadata(consumerOffsets.get(partition)));
+    }
+    return offsetsMap;
+  }
+
+  @Override
+  public void shutdown()
+      throws Exception {
+    if (consumer != null) {
+      consumer = null;
+      Kafka2ConsumerManager.releaseKafkaConsumer(consumer);
+    }
+  }
+}
diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaAvroMessageDecoder.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaAvroMessageDecoder.java
new file mode 100644
index 0000000..5e09faf
--- /dev/null
+++ b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaAvroMessageDecoder.java
@@ -0,0 +1,290 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.kafka2;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLConnection;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.commons.lang.StringUtils;
+import org.apache.pinot.common.data.Schema;
+import org.apache.pinot.common.utils.retry.RetryPolicies;
+import org.apache.pinot.core.data.GenericRow;
+import org.apache.pinot.core.realtime.stream.AvroRecordToPinotRowGenerator;
+import org.apache.pinot.core.realtime.stream.StreamMessageDecoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@NotThreadSafe
+public class KafkaAvroMessageDecoder implements StreamMessageDecoder<byte[]> {
+  private static final Logger LOGGER = LoggerFactory.getLogger(KafkaAvroMessageDecoder.class);
+
+  private static final String SCHEMA_REGISTRY_REST_URL = "schema.registry.rest.url";
+  private static final String SCHEMA_REGISTRY_SCHEMA_NAME = "schema.registry.schema.name";
+  private org.apache.avro.Schema defaultAvroSchema;
+  private MD5AvroSchemaMap md5ToAvroSchemaMap;
+
+  // A global cache for schemas across all threads.
+  private static final Map<String, org.apache.avro.Schema> globalSchemaCache = new HashMap<>();
+  // Suffix for getting the latest schema
+  private static final String LATEST = "-latest";
+
+  // Reusable byte[] to read MD5 from payload. This is OK as this class is used only by a single thread.
+  private final byte[] reusableMD5Bytes = new byte[SCHEMA_HASH_LENGTH];
+
+  private DecoderFactory decoderFactory;
+  private AvroRecordToPinotRowGenerator avroRecordConvetrer;
+
+  private static final int MAGIC_BYTE_LENGTH = 1;
+  private static final int SCHEMA_HASH_LENGTH = 16;
+  private static final int HEADER_LENGTH = MAGIC_BYTE_LENGTH + SCHEMA_HASH_LENGTH;
+
+  private static final int SCHEMA_HASH_START_OFFSET = MAGIC_BYTE_LENGTH;
+
+  private static final int MAXIMUM_SCHEMA_FETCH_RETRY_COUNT = 5;
+  private static final int MINIMUM_SCHEMA_FETCH_RETRY_TIME_MILLIS = 500;
+  private static final float SCHEMA_FETCH_RETRY_EXPONENTIAL_BACKOFF_FACTOR = 2.0f;
+
+  private String[] schemaRegistryUrls;
+
+  @Override
+  public void init(Map<String, String> props, Schema indexingSchema, String topicName)
+      throws Exception {
+    schemaRegistryUrls = parseSchemaRegistryUrls(props.get(SCHEMA_REGISTRY_REST_URL));
+
+    for (String schemaRegistryUrl : schemaRegistryUrls) {
+      StringUtils.chomp(schemaRegistryUrl, "/");
+    }
+
+    String avroSchemaName = topicName;
+    if (props.containsKey(SCHEMA_REGISTRY_SCHEMA_NAME) && props.get(SCHEMA_REGISTRY_SCHEMA_NAME) != null && !props
+        .get(SCHEMA_REGISTRY_SCHEMA_NAME).isEmpty()) {
+      avroSchemaName = props.get(SCHEMA_REGISTRY_SCHEMA_NAME);
+    }
+    // With the logic below, we may not set defaultAvroSchema to be the latest one everytime.
+    // The schema is fetched once when the machine starts. Until the next restart. the latest schema is
+    // not fetched.
+    // But then we always pay attention to the exact MD5 hash and attempt to fetch the schema for that particular hash
+    // before decoding an incoming kafka event. We use defaultAvroSchema only if the fetch for the particular MD5 fails,
+    // but then we will retry that fetch on every event in case of failure.
+    synchronized (globalSchemaCache) {
+      final String hashKey = avroSchemaName + LATEST;
+      defaultAvroSchema = globalSchemaCache.get(hashKey);
+      if (defaultAvroSchema == null) {
+        defaultAvroSchema = fetchSchema("/latest_with_type=" + avroSchemaName);
+        globalSchemaCache.put(hashKey, defaultAvroSchema);
+        LOGGER.info("Populated schema cache with schema for {}", hashKey);
+      }
+    }
+    this.avroRecordConvetrer = new AvroRecordToPinotRowGenerator(indexingSchema);
+    this.decoderFactory = new DecoderFactory();
+    md5ToAvroSchemaMap = new MD5AvroSchemaMap();
+  }
+
+  @Override
+  public GenericRow decode(byte[] payload, GenericRow destination) {
+    return decode(payload, 0, payload.length, destination);
+  }
+
+  @Override
+  public GenericRow decode(byte[] payload, int offset, int length, GenericRow destination) {
+    if (payload == null || payload.length == 0 || length == 0) {
+      return null;
+    }
+
+    System.arraycopy(payload, SCHEMA_HASH_START_OFFSET + offset, reusableMD5Bytes, 0, SCHEMA_HASH_LENGTH);
+
+    boolean schemaUpdateFailed = false;
+    org.apache.avro.Schema schema = md5ToAvroSchemaMap.getSchema(reusableMD5Bytes);
+    if (schema == null) {
+      // We will get here for the first row consumed in the segment, and every row that has a schema ID that is
+      // not yet in md5ToAvroSchemaMap.
+      synchronized (globalSchemaCache) {
+        final String hashKey = hex(reusableMD5Bytes);
+        schema = globalSchemaCache.get(hashKey);
+        if (schema == null) {
+          // We will get here only if no partition of the table has populated the global schema cache.
+          // In that case, one of the consumers will fetch the schema and populate the cache, and the others
+          // should find it in the cache and po
+          final String schemaUri = "/id=" + hex(reusableMD5Bytes);
+          try {
+            schema = fetchSchema(schemaUri);
+            globalSchemaCache.put(hashKey, schema);
+            md5ToAvroSchemaMap.addSchema(reusableMD5Bytes, schema);
+          } catch (Exception e) {
+            schema = defaultAvroSchema;
+            LOGGER
+                .error("Error fetching schema using url {}. Attempting to continue with previous schema", schemaUri, e);
+            schemaUpdateFailed = true;
+          }
+        } else {
+          LOGGER.info("Found schema for {} in cache", hashKey);
+          md5ToAvroSchemaMap.addSchema(reusableMD5Bytes, schema);
+        }
+      }
+    }
+    DatumReader<Record> reader = new GenericDatumReader<Record>(schema);
+    try {
+      Record avroRecord = reader.read(null,
+          decoderFactory.createBinaryDecoder(payload, HEADER_LENGTH + offset, length - HEADER_LENGTH, null));
+      return avroRecordConvetrer.transform(avroRecord, destination);
+    } catch (IOException e) {
+      LOGGER.error("Caught exception while reading message using schema {}{}",
+          (schema == null ? "null" : schema.getName()),
+          (schemaUpdateFailed ? "(possibly due to schema update failure)" : ""), e);
+      return null;
+    }
+  }
+
+  private String hex(byte[] bytes) {
+    StringBuilder builder = new StringBuilder(2 * bytes.length);
+    for (byte aByte : bytes) {
+      String hexString = Integer.toHexString(0xFF & aByte);
+      if (hexString.length() < 2) {
+        hexString = "0" + hexString;
+      }
+      builder.append(hexString);
+    }
+    return builder.toString();
+  }
+
+  private static class SchemaFetcher implements Callable<Boolean> {
+    private org.apache.avro.Schema _schema;
+    private URL url;
+    private boolean _isSuccessful = false;
+
+    SchemaFetcher(URL url) {
+      this.url = url;
+    }
+
+    @Override
+    public Boolean call()
+        throws Exception {
+      try {
+        URLConnection conn = url.openConnection();
+        conn.setConnectTimeout(15000);
+        conn.setReadTimeout(15000);
+        LOGGER.info("Fetching schema using url {}", url.toString());
+
+        StringBuilder queryResp = new StringBuilder();
+        try (BufferedReader reader = new BufferedReader(
+            new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8))) {
+          for (String line = reader.readLine(); line != null; line = reader.readLine()) {
+            queryResp.append(line);
+          }
+        }
+
+        _schema = org.apache.avro.Schema.parse(queryResp.toString());
+
+        LOGGER.info("Schema fetch succeeded on url {}", url.toString());
+        return Boolean.TRUE;
+      } catch (Exception e) {
+        LOGGER.warn("Caught exception while fetching schema", e);
+        return Boolean.FALSE;
+      }
+    }
+
+    public org.apache.avro.Schema getSchema() {
+      return _schema;
+    }
+  }
+
+  private org.apache.avro.Schema fetchSchema(String reference)
+      throws Exception {
+    SchemaFetcher schemaFetcher = new SchemaFetcher(makeRandomUrl(reference));
+    RetryPolicies
+        .exponentialBackoffRetryPolicy(MAXIMUM_SCHEMA_FETCH_RETRY_COUNT, MINIMUM_SCHEMA_FETCH_RETRY_TIME_MILLIS,
+            SCHEMA_FETCH_RETRY_EXPONENTIAL_BACKOFF_FACTOR).attempt(schemaFetcher);
+    return schemaFetcher.getSchema();
+  }
+
+  /**
+   * Private class for encapsulating MD5 to Avro schema mapping.
+   * <ul>
+   *   <li> Maintains two lists, one for md5s and another for schema. </li>
+   *   <li> MD5 at index i in the MD5 list, corresponds to Schema at index i in the schema list. </li>
+   * </ul>
+   */
+  private static class MD5AvroSchemaMap {
+    private List<byte[]> md5s;
+    private List<org.apache.avro.Schema> schemas;
+
+    /**
+     * Constructor for the class.
+     */
+    private MD5AvroSchemaMap() {
+      md5s = new ArrayList<>();
+      schemas = new ArrayList<>();
+    }
+
+    /**
+     * Returns the Avro schema corresponding to the given MD5.
+     *
+     * @param md5ForSchema MD5 for which to get the avro schema.
+     * @return Avro schema for the given MD5.
+     */
+    private org.apache.avro.Schema getSchema(byte[] md5ForSchema) {
+      for (int i = 0; i < md5s.size(); i++) {
+        if (Arrays.equals(md5s.get(i), md5ForSchema)) {
+          return schemas.get(i);
+        }
+      }
+      return null;
+    }
+
+    /**
+     * Adds mapping between MD5 and Avro schema.
+     * Caller to ensure that addSchema is called only once per MD5-Schema pair.
+     *
+     * @param md5 MD5 for the Schema
+     * @param schema Avro Schema
+     */
+    private void addSchema(byte[] md5, org.apache.avro.Schema schema) {
+      md5s.add(Arrays.copyOf(md5, md5.length));
+      schemas.add(schema);
+    }
+  }
+
+  protected URL makeRandomUrl(String reference)
+      throws MalformedURLException {
+    Random rand = new Random();
+    int randomInteger = rand.nextInt(schemaRegistryUrls.length);
+    return new URL(schemaRegistryUrls[randomInteger] + reference);
+  }
+
+  protected String[] parseSchemaRegistryUrls(String schemaConfig) {
+    return schemaConfig.split(",");
+  }
+}
diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaConnectionHandler.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaConnectionHandler.java
deleted file mode 100644
index 802062f..0000000
--- a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaConnectionHandler.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.realtime.impl.kafka2;
-
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.serialization.BytesDeserializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.pinot.core.realtime.stream.StreamConfig;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Properties;
-
-public abstract class KafkaConnectionHandler {
-
-    protected final KafkaPartitionLevelStreamConfig _config;
-    protected final int _partition;
-    protected final String _topic;
-    protected final Consumer<String, byte[]> _consumer;
-    protected final TopicPartition _topicPartition;
-
-    public KafkaConnectionHandler(StreamConfig streamConfig, int partition) {
-        _config = new KafkaPartitionLevelStreamConfig(streamConfig);
-        _partition = partition;
-        _topic = _config.getKafkaTopicName();
-        Properties consumerProp = new Properties();
-        consumerProp.putAll(streamConfig.getStreamConfigsMap());
-        consumerProp.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, _config.getBootstrapHosts());
-        consumerProp.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
-        consumerProp.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class.getName());
-        _consumer = new KafkaConsumer<>(consumerProp);
-        _topicPartition = new TopicPartition(_topic, _partition);
-        _consumer.assign(Collections.singletonList(_topicPartition));
-
-    }
-
-    public void close() throws IOException {
-        _consumer.close();
-    }
-
-
-}
diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaJSONMessageDecoder.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaJSONMessageDecoder.java
new file mode 100644
index 0000000..8d1fd96
--- /dev/null
+++ b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaJSONMessageDecoder.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.kafka2;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.util.Arrays;
+import java.util.Map;
+import org.apache.pinot.common.data.FieldSpec;
+import org.apache.pinot.common.data.Schema;
+import org.apache.pinot.common.utils.JsonUtils;
+import org.apache.pinot.core.data.GenericRow;
+import org.apache.pinot.core.realtime.stream.StreamMessageDecoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class KafkaJSONMessageDecoder implements StreamMessageDecoder<byte[]> {
+  private static final Logger LOGGER = LoggerFactory.getLogger(KafkaJSONMessageDecoder.class);
+
+  private Schema schema;
+
+  @Override
+  public void init(Map<String, String> props, Schema indexingSchema, String topicName)
+      throws Exception {
+    this.schema = indexingSchema;
+  }
+
+  @Override
+  public GenericRow decode(byte[] payload, GenericRow destination) {
+    try {
+      JsonNode message = JsonUtils.bytesToJsonNode(payload);
+      for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
+        String column = fieldSpec.getName();
+        destination.putField(column, JsonUtils.extractValue(message.get(column), fieldSpec));
+      }
+      return destination;
+    } catch (Exception e) {
+      LOGGER.error("Caught exception while decoding row, discarding row.", e);
+      return null;
+    }
+  }
+
+  @Override
+  public GenericRow decode(byte[] payload, int offset, int length, GenericRow destination) {
+    return decode(Arrays.copyOfRange(payload, offset, offset + length), destination);
+  }
+}
diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaMessageBatch.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaMessageBatch.java
deleted file mode 100644
index 22aa683..0000000
--- a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaMessageBatch.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.realtime.impl.kafka2;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.pinot.core.realtime.stream.MessageBatch;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class KafkaMessageBatch implements MessageBatch<byte[]> {
-
-    private List<MessageAndOffset> messageList = new ArrayList<>();
-
-    public KafkaMessageBatch(Iterable<ConsumerRecord<String, byte[]>> iterable) {
-        for (ConsumerRecord<String, byte[]> record : iterable) {
-            messageList.add(new MessageAndOffset(record.value(), record.offset()));
-        }
-    }
-
-    @Override
-    public int getMessageCount() {
-        return messageList.size();
-    }
-
-    @Override
-    public byte[] getMessageAtIndex(int index) {
-        return messageList.get(index).getMessage().array();
-    }
-
-    @Override
-    public int getMessageOffsetAtIndex(int index) {
-        return messageList.get(index).getMessage().arrayOffset();
-    }
-
-    @Override
-    public int getMessageLengthAtIndex(int index) {
-        return messageList.get(index).getMessage().array().length;
-    }
-
-    @Override
-    public long getNextStreamMessageOffsetAtIndex(int index) {
-        return messageList.get(index).getNextOffset();
-    }
-
-    public Iterable<MessageAndOffset> iterable() {
-        return messageList;
-    }
-}
diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionConsumer.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionConsumer.java
deleted file mode 100644
index de3295d..0000000
--- a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionConsumer.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.realtime.impl.kafka2;
-
-import org.apache.kafka.clients.consumer.*;
-
-import org.apache.pinot.core.realtime.stream.MessageBatch;
-import org.apache.pinot.core.realtime.stream.PartitionLevelConsumer;
-import org.apache.pinot.core.realtime.stream.StreamConfig;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.TimeoutException;
-
-public class KafkaPartitionConsumer extends KafkaConnectionHandler implements PartitionLevelConsumer {
-
-
-    public KafkaPartitionConsumer(StreamConfig streamConfig, int partition) {
-        super(streamConfig, partition);
-    }
-
-    @Override
-    public MessageBatch fetchMessages(long startOffset, long endOffset, int timeoutMillis) throws TimeoutException {
-        _consumer.seek(_topicPartition, startOffset);
-
-        ConsumerRecords<String, byte[]> consumerRecords = _consumer.poll(null);
-        List<ConsumerRecord<String, byte[]>> records = consumerRecords.records(_topicPartition);
-        return new KafkaMessageBatch(records);
-    }
-
-    @Override
-    public void close() throws IOException {
-        super.close();
-    }
-}
diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionLevelStreamConfig.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionLevelStreamConfig.java
deleted file mode 100644
index c154a38..0000000
--- a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionLevelStreamConfig.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.realtime.impl.kafka2;
-
-import com.google.common.base.Preconditions;
-import org.apache.commons.lang.StringUtils;
-import org.apache.pinot.common.utils.EqualityUtils;
-import org.apache.pinot.core.realtime.stream.StreamConfig;
-
-import java.util.Map;
-import java.util.Properties;
-
-public class KafkaPartitionLevelStreamConfig {
-
-    private final String _kafkaTopicName;
-    private final String _bootstrapHosts;
-    private final int _kafkaBufferSize;
-    private final int _kafkaSocketTimeout;
-    private final int _kafkaFetcherSizeBytes;
-    private final int _kafkaFetcherMinBytes;
-    private final Map<String, String> _streamConfigMap;
-
-    /**
-     * Builds a wrapper around {@link StreamConfig} to fetch kafka partition level consumer related configs
-     * @param streamConfig
-     */
-    public KafkaPartitionLevelStreamConfig(StreamConfig streamConfig) {
-        _streamConfigMap = streamConfig.getStreamConfigsMap();
-
-        _kafkaTopicName = streamConfig.getTopicName();
-
-        String llcBrokerListKey = KafkaStreamConfigProperties
-                .constructStreamProperty(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_BROKER_LIST);
-        String llcBufferKey = KafkaStreamConfigProperties
-                .constructStreamProperty(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE);
-        String llcTimeoutKey = KafkaStreamConfigProperties
-                .constructStreamProperty(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_SOCKET_TIMEOUT);
-        String fetcherSizeKey = KafkaStreamConfigProperties
-                .constructStreamProperty(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_FETCHER_SIZE_BYTES);
-        String fetcherMinBytesKey = KafkaStreamConfigProperties
-                .constructStreamProperty(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_FETCHER_MIN_BYTES);
-        _bootstrapHosts = _streamConfigMap.get(llcBrokerListKey);
-        _kafkaBufferSize = getIntConfigWithDefault(_streamConfigMap, llcBufferKey,
-                KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE_DEFAULT);
-        _kafkaSocketTimeout = getIntConfigWithDefault(_streamConfigMap, llcTimeoutKey,
-                KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_SOCKET_TIMEOUT_DEFAULT);
-        _kafkaFetcherSizeBytes = getIntConfigWithDefault(_streamConfigMap, fetcherSizeKey, _kafkaBufferSize);
-        _kafkaFetcherMinBytes = getIntConfigWithDefault(_streamConfigMap, fetcherMinBytesKey,
-                KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_FETCHER_MIN_BYTES_DEFAULT);
-        Preconditions.checkNotNull(_bootstrapHosts,
-                "Must specify kafka brokers list " + llcBrokerListKey + " in case of low level kafka consumer");
-    }
-
-    public String getKafkaTopicName() {
-        return _kafkaTopicName;
-    }
-
-    public String getBootstrapHosts() {
-        return _bootstrapHosts;
-    }
-
-    public int getKafkaBufferSize() {
-        return _kafkaBufferSize;
-    }
-
-    public int getKafkaSocketTimeout() {
-        return _kafkaSocketTimeout;
-    }
-
-    public int getKafkaFetcherSizeBytes() {
-        return _kafkaFetcherSizeBytes;
-    }
-
-    public int getKafkaFetcherMinBytes() {
-        return _kafkaFetcherMinBytes;
-    }
-
-    private int getIntConfigWithDefault(Map<String, String> configMap, String key, int defaultValue) {
-        String stringValue = configMap.get(key);
-        try {
-            if (StringUtils.isNotEmpty(stringValue)) {
-                return Integer.parseInt(stringValue);
-            }
-            return defaultValue;
-        } catch (NumberFormatException ex) {
-            return defaultValue;
-        }
-    }
-
-    @Override
-    public String toString() {
-        return "KafkaLowLevelStreamConfig{" + "_kafkaTopicName='" + _kafkaTopicName + '\'' + ", _bootstrapHosts='"
-                + _bootstrapHosts + '\'' + ", _kafkaBufferSize='" + _kafkaBufferSize + '\'' + ", _kafkaSocketTimeout='"
-                + _kafkaSocketTimeout + '\'' + ", _kafkaFetcherSizeBytes='" + _kafkaFetcherSizeBytes + '\'' + ", _kafkaFetcherMinBytes='"
-                + _kafkaFetcherMinBytes + '\'' + '}';
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (EqualityUtils.isSameReference(this, o)) {
-            return true;
-        }
-
-        if (EqualityUtils.isNullOrNotSameClass(this, o)) {
-            return false;
-        }
-
-        KafkaPartitionLevelStreamConfig that = (KafkaPartitionLevelStreamConfig) o;
-
-        return EqualityUtils.isEqual(_kafkaTopicName, that._kafkaTopicName) && EqualityUtils
-                .isEqual(_bootstrapHosts, that._bootstrapHosts) && EqualityUtils
-                .isEqual(_kafkaBufferSize, that._kafkaBufferSize) && EqualityUtils
-                .isEqual(_kafkaSocketTimeout, that._kafkaSocketTimeout) && EqualityUtils
-                .isEqual(_kafkaFetcherSizeBytes, that._kafkaFetcherSizeBytes) && EqualityUtils
-                .isEqual(_kafkaFetcherMinBytes, that._kafkaFetcherMinBytes);
-    }
-
-    @Override
-    public int hashCode() {
-        int result = EqualityUtils.hashCodeOf(_kafkaTopicName);
-        result = EqualityUtils.hashCodeOf(result, _bootstrapHosts);
-        result = EqualityUtils.hashCodeOf(result, _kafkaBufferSize);
-        result = EqualityUtils.hashCodeOf(result, _kafkaSocketTimeout);
-        result = EqualityUtils.hashCodeOf(result, _kafkaFetcherSizeBytes);
-        result = EqualityUtils.hashCodeOf(result, _kafkaFetcherMinBytes);
-        return result;
-    }
-}
diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaStreamMetadataProvider.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaStreamMetadataProvider.java
deleted file mode 100644
index 3871d85..0000000
--- a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaStreamMetadataProvider.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.realtime.impl.kafka2;
-
-import com.google.common.base.Preconditions;
-import org.apache.kafka.clients.admin.AdminClient;
-import org.apache.kafka.clients.admin.DescribeTopicsResult;
-import org.apache.kafka.clients.admin.TopicDescription;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.KafkaFuture;
-import org.apache.pinot.core.realtime.stream.OffsetCriteria;
-import org.apache.pinot.core.realtime.stream.StreamConfig;
-import org.apache.pinot.core.realtime.stream.StreamMetadataProvider;
-
-import javax.annotation.Nonnull;
-import java.io.IOException;
-import java.time.Duration;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeoutException;
-
-public class KafkaStreamMetadataProvider extends KafkaConnectionHandler implements StreamMetadataProvider {
-
-    private AdminClient _adminClient;
-
-    public KafkaStreamMetadataProvider(StreamConfig streamConfig, int partition) {
-        super(streamConfig, partition);
-        final Properties props = new Properties();
-        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, _config.getBootstrapHosts());
-        _adminClient = AdminClient.create(props);
-    }
-
-    @Override
-    public int fetchPartitionCount(long timeoutMillis) {
-        DescribeTopicsResult result = _adminClient.describeTopics(Collections.singletonList(_config.getKafkaTopicName()));
-        Map<String, KafkaFuture<TopicDescription>> values = result.values();
-        KafkaFuture<TopicDescription> topicDescription = values.get(_config.getKafkaTopicName());
-        try {
-            return topicDescription.get().partitions().size();
-        } catch (InterruptedException | ExecutionException e) {
-            throw new RuntimeException("");
-        }
-    }
-
-    @Override
-    public long fetchPartitionOffset(@Nonnull OffsetCriteria offsetCriteria, long timeoutMillis) throws TimeoutException {
-
-        Preconditions.checkNotNull(offsetCriteria);
-        if (offsetCriteria.isLargest()) {
-            return _consumer.endOffsets(Collections.singletonList(_topicPartition), Duration.ofMillis(timeoutMillis)).get(_topicPartition);
-        } else if (offsetCriteria.isSmallest()) {
-            return _consumer.beginningOffsets(Collections.singletonList(_topicPartition), Duration.ofMillis(timeoutMillis)).get(_topicPartition);
-        } else {
-            throw new IllegalArgumentException("Unknown initial offset value " + offsetCriteria.toString());
-        }
-
-    }
-
-    @Override
-    public void close() throws IOException {
-        super.close();
-    }
-}
diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/MessageAndOffset.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/MessageAndOffset.java
index 0dea267..b5bdaba 100644
--- a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/MessageAndOffset.java
+++ b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/MessageAndOffset.java
@@ -20,30 +20,34 @@ package org.apache.pinot.core.realtime.impl.kafka2;
 
 import java.nio.ByteBuffer;
 
+
 public class MessageAndOffset {
 
-    private ByteBuffer _message;
-    private long _offset;
+  private ByteBuffer _message;
+  private long _offset;
+
+  public MessageAndOffset(byte[] message, long offset) {
+    this(ByteBuffer.wrap(message), offset);
+  }
 
-    public MessageAndOffset(byte[] message, long offset) {
-        _message = ByteBuffer.wrap(message);
-        _offset = offset;
-    }
+  public MessageAndOffset(ByteBuffer message, long offset) {
+    _message = message;
+    _offset = offset;
+  }
 
-    public MessageAndOffset(ByteBuffer message, long offset) {
-        _message = message;
-        _offset = offset;
-    }
+  public ByteBuffer getMessage() {
+    return _message;
+  }
 
-    public ByteBuffer getMessage() {
-        return _message;
-    }
+  public long getOffset() {
+    return _offset;
+  }
 
-    public long getOffset() {
-        return _offset;
-    }
+  public long getNextOffset() {
+    return getOffset() + 1;
+  }
 
-    public long getNextOffset() {
-        return _offset + 1;
-    }
+  public int payloadSize() {
+    return getMessage().array().length;
+  }
 }
diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/test/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionLevelConsumerTest.java b/pinot-connectors/pinot-connector-kafka-2.0/src/test/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionLevelConsumerTest.java
new file mode 100644
index 0000000..cc28127
--- /dev/null
+++ b/pinot-connectors/pinot-connector-kafka-2.0/src/test/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionLevelConsumerTest.java
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.kafka2;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.pinot.core.realtime.impl.kafka2.utils.MiniKafkaCluster;
+import org.apache.pinot.core.realtime.stream.OffsetCriteria;
+import org.apache.pinot.core.realtime.stream.StreamConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+/**
+ * Tests for the KafkaPartitionLevelConsumer.
+ */
+public class KafkaPartitionLevelConsumerTest {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPartitionLevelConsumerTest.class);
+  private static final long STABILIZE_SLEEP_DELAYS = 3000;
+  private static final String TEST_TOPIC_1 = "foo";
+  private static final String TEST_TOPIC_2 = "bar";
+  private static final int NUM_MSG_PRODUCED = 1000;
+
+  private static MiniKafkaCluster kafkaCluster;
+  private static String brokerAddress;
+
+  @BeforeClass
+  public static void setup()
+      throws Exception {
+    kafkaCluster = new MiniKafkaCluster.Builder().newServer("0").build();
+    LOGGER.info("Trying to start MiniKafkaCluster");
+    kafkaCluster.start();
+    brokerAddress = getKakfaBroker();
+    kafkaCluster.createTopic(TEST_TOPIC_1, 1, 1);
+    kafkaCluster.createTopic(TEST_TOPIC_2, 2, 1);
+    Thread.sleep(STABILIZE_SLEEP_DELAYS);
+    produceMsgToKafka();
+    Thread.sleep(STABILIZE_SLEEP_DELAYS);
+  }
+
+  private static void produceMsgToKafka() {
+    Properties props = new Properties();
+    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getKakfaBroker());
+    props.put(ProducerConfig.CLIENT_ID_CONFIG, "clientId");
+    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+    KafkaProducer p = new KafkaProducer<>(props);
+    for (int i = 0; i < NUM_MSG_PRODUCED; i++) {
+      p.send(new ProducerRecord(TEST_TOPIC_1, "sample_msg_" + i));
+      p.send(new ProducerRecord(TEST_TOPIC_2, "sample_msg_" + i));
+    }
+  }
+
+  private static String getKakfaBroker() {
+    return "127.0.0.1:" + kafkaCluster.getKafkaServerPort(0);
+  }
+
+  @AfterClass
+  public static void shutDown()
+      throws Exception {
+    kafkaCluster.deleteTopic(TEST_TOPIC_1);
+    kafkaCluster.deleteTopic(TEST_TOPIC_2);
+    kafkaCluster.close();
+  }
+
+  @Test
+  public void testBuildConsumer()
+      throws Exception {
+    String streamType = "kafka";
+    String streamKafkaTopicName = "theTopic";
+    String streamKafkaBrokerList = "127.0.0.1:" + kafkaCluster.getKafkaServerPort(0);
+    String streamKafkaConsumerType = "simple";
+    String clientId = "clientId";
+
+    Map<String, String> streamConfigMap = new HashMap<>();
+    streamConfigMap.put("streamType", streamType);
+    streamConfigMap.put("stream.kafka.topic.name", streamKafkaTopicName);
+    streamConfigMap.put("stream.kafka.broker.list", streamKafkaBrokerList);
+    streamConfigMap.put("stream.kafka.consumer.type", streamKafkaConsumerType);
+    streamConfigMap.put("stream.kafka.consumer.factory.class.name", Kafka2ConsumerFactory.class.getName());
+    streamConfigMap.put("stream.kafka.decoder.class.name", "decoderClass");
+    streamConfigMap.put("stream.kafka.fetcher.size", "10000");
+    streamConfigMap.put("stream.kafka.fetcher.minBytes", "20000");
+    StreamConfig streamConfig = new StreamConfig(streamConfigMap);
+
+    Kafka2PartitionLevelStreamMetadataProvider streamMetadataProvider =
+        new Kafka2PartitionLevelStreamMetadataProvider(clientId, streamConfig);
+
+    // test default value
+    Kafka2PartitionLevelPartitionLevelConsumer kafkaSimpleStreamConsumer =
+        new Kafka2PartitionLevelPartitionLevelConsumer(clientId, streamConfig, 0);
+    kafkaSimpleStreamConsumer.fetchMessages(12345L, 23456L, 10000);
+
+    Assert.assertEquals(Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE_DEFAULT,
+        kafkaSimpleStreamConsumer.getKafka2PartitionLevelStreamConfig().getKafkaBufferSize());
+    Assert.assertEquals(Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_SOCKET_TIMEOUT_DEFAULT,
+        kafkaSimpleStreamConsumer.getKafka2PartitionLevelStreamConfig().getKafkaSocketTimeout());
+
+    // test parsing values
+    Assert.assertEquals(10000,
+        kafkaSimpleStreamConsumer.getKafka2PartitionLevelStreamConfig().getKafkaFetcherSizeBytes());
+    Assert
+        .assertEquals(20000, kafkaSimpleStreamConsumer.getKafka2PartitionLevelStreamConfig().getKafkaFetcherMinBytes());
+
+    // test user defined values
+    streamConfigMap.put("stream.kafka.buffer.size", "100");
+    streamConfigMap.put("stream.kafka.socket.timeout", "1000");
+    streamConfig = new StreamConfig(streamConfigMap);
+    kafkaSimpleStreamConsumer = new Kafka2PartitionLevelPartitionLevelConsumer(clientId, streamConfig, 0);
+    kafkaSimpleStreamConsumer.fetchMessages(12345L, 23456L, 10000);
+    Assert.assertEquals(100, kafkaSimpleStreamConsumer.getKafka2PartitionLevelStreamConfig().getKafkaBufferSize());
+    Assert.assertEquals(1000, kafkaSimpleStreamConsumer.getKafka2PartitionLevelStreamConfig().getKafkaSocketTimeout());
+  }
+
+  @Test
+  public void testGetPartitionCount() {
+    String streamType = "kafka";
+    String streamKafkaBrokerList = "127.0.0.1:" + kafkaCluster.getKafkaServerPort(0);
+    String streamKafkaConsumerType = "simple";
+    String clientId = "clientId";
+
+    Map<String, String> streamConfigMap = new HashMap<>();
+    streamConfigMap.put("streamType", streamType);
+    streamConfigMap.put("stream.kafka.topic.name", TEST_TOPIC_1);
+    streamConfigMap.put("stream.kafka.broker.list", streamKafkaBrokerList);
+    streamConfigMap.put("stream.kafka.consumer.type", streamKafkaConsumerType);
+    streamConfigMap.put("stream.kafka.consumer.factory.class.name", Kafka2ConsumerFactory.class.getName());
+    streamConfigMap.put("stream.kafka.decoder.class.name", "decoderClass");
+    StreamConfig streamConfig = new StreamConfig(streamConfigMap);
+
+    Kafka2PartitionLevelStreamMetadataProvider streamMetadataProvider =
+        new Kafka2PartitionLevelStreamMetadataProvider(clientId, streamConfig);
+    Assert.assertEquals(streamMetadataProvider.fetchPartitionCount(10000L), 1);
+
+    streamConfigMap = new HashMap<>();
+    streamConfigMap.put("streamType", streamType);
+    streamConfigMap.put("stream.kafka.topic.name", TEST_TOPIC_2);
+    streamConfigMap.put("stream.kafka.broker.list", streamKafkaBrokerList);
+    streamConfigMap.put("stream.kafka.consumer.type", streamKafkaConsumerType);
+    streamConfigMap.put("stream.kafka.consumer.factory.class.name", Kafka2ConsumerFactory.class.getName());
+    streamConfigMap.put("stream.kafka.decoder.class.name", "decoderClass");
+    streamConfig = new StreamConfig(streamConfigMap);
+
+    streamMetadataProvider = new Kafka2PartitionLevelStreamMetadataProvider(clientId, streamConfig);
+    Assert.assertEquals(streamMetadataProvider.fetchPartitionCount(10000L), 2);
+  }
+
+  @Test
+  public void testFetchMessages()
+      throws Exception {
+    String streamType = "kafka";
+    String streamKafkaTopicName = "theTopic";
+    String streamKafkaBrokerList = "127.0.0.1:" + kafkaCluster.getKafkaServerPort(0);
+    String streamKafkaConsumerType = "simple";
+    String clientId = "clientId";
+
+    Map<String, String> streamConfigMap = new HashMap<>();
+    streamConfigMap.put("streamType", streamType);
+    streamConfigMap.put("stream.kafka.topic.name", streamKafkaTopicName);
+    streamConfigMap.put("stream.kafka.broker.list", streamKafkaBrokerList);
+    streamConfigMap.put("stream.kafka.consumer.type", streamKafkaConsumerType);
+    streamConfigMap.put("stream.kafka.consumer.factory.class.name", Kafka2ConsumerFactory.class.getName());
+    streamConfigMap.put("stream.kafka.decoder.class.name", "decoderClass");
+    StreamConfig streamConfig = new StreamConfig(streamConfigMap);
+
+    int partition = 0;
+    Kafka2PartitionLevelPartitionLevelConsumer kafkaSimpleStreamConsumer =
+        new Kafka2PartitionLevelPartitionLevelConsumer(clientId, streamConfig, partition);
+    kafkaSimpleStreamConsumer.fetchMessages(12345L, 23456L, 10000);
+  }
+
+  @Test
+  public void testFetchOffsets()
+      throws Exception {
+    testFetchOffsets(TEST_TOPIC_1);
+    testFetchOffsets(TEST_TOPIC_2);
+  }
+
+  private void testFetchOffsets(String topic)
+      throws Exception {
+    String streamType = "kafka";
+    String streamKafkaBrokerList = "127.0.0.1:" + kafkaCluster.getKafkaServerPort(0);
+    String streamKafkaConsumerType = "simple";
+    String clientId = "clientId";
+
+    Map<String, String> streamConfigMap = new HashMap<>();
+    streamConfigMap.put("streamType", streamType);
+    streamConfigMap.put("stream.kafka.topic.name", topic);
+    streamConfigMap.put("stream.kafka.broker.list", streamKafkaBrokerList);
+    streamConfigMap.put("stream.kafka.consumer.type", streamKafkaConsumerType);
+    streamConfigMap.put("stream.kafka.consumer.factory.class.name", Kafka2ConsumerFactory.class.getName());
+    streamConfigMap.put("stream.kafka.decoder.class.name", "decoderClass");
+    StreamConfig streamConfig = new StreamConfig(streamConfigMap);
+
+    int numPartitions =
+        new Kafka2PartitionLevelStreamMetadataProvider(clientId, streamConfig).fetchPartitionCount(10000);
+    for (int partition = 0; partition < numPartitions; partition++) {
+      Kafka2PartitionLevelStreamMetadataProvider kafkaStreamMetadataProvider =
+          new Kafka2PartitionLevelStreamMetadataProvider(clientId, streamConfig, partition);
+      Assert.assertEquals(0, kafkaStreamMetadataProvider
+          .fetchPartitionOffset(new OffsetCriteria.OffsetCriteriaBuilder().withOffsetSmallest(), 10000));
+      Assert.assertEquals(NUM_MSG_PRODUCED / numPartitions, kafkaStreamMetadataProvider
+          .fetchPartitionOffset(new OffsetCriteria.OffsetCriteriaBuilder().withOffsetLargest(), 10000));
+    }
+  }
+}
diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/test/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionLevelStreamConfigTest.java b/pinot-connectors/pinot-connector-kafka-2.0/src/test/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionLevelStreamConfigTest.java
new file mode 100644
index 0000000..df02b9f
--- /dev/null
+++ b/pinot-connectors/pinot-connector-kafka-2.0/src/test/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionLevelStreamConfigTest.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.kafka2;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.core.realtime.stream.StreamConfig;
+import org.apache.pinot.core.realtime.stream.StreamConfigProperties;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class KafkaPartitionLevelStreamConfigTest {
+
+  private Kafka2PartitionLevelStreamConfig getStreamConfig(String topic, String bootstrapHosts, String buffer,
+      String socketTimeout) {
+    return getStreamConfig(topic, bootstrapHosts, buffer, socketTimeout, null, null);
+  }
+
+  private Kafka2PartitionLevelStreamConfig getStreamConfig(String topic, String bootstrapHosts, String buffer,
+      String socketTimeout, String fetcherSize, String fetcherMinBytes) {
+    Map<String, String> streamConfigMap = new HashMap<>();
+    String streamType = "kafka";
+    String consumerType = StreamConfig.ConsumerType.LOWLEVEL.toString();
+    String consumerFactoryClassName = Kafka2ConsumerFactory.class.getName();
+    String decoderClass = KafkaAvroMessageDecoder.class.getName();
+    streamConfigMap.put(StreamConfigProperties.STREAM_TYPE, streamType);
+    streamConfigMap
+        .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_TOPIC_NAME),
+            topic);
+    streamConfigMap
+        .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
+            consumerType);
+    streamConfigMap.put(StreamConfigProperties
+            .constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS),
+        consumerFactoryClassName);
+    streamConfigMap
+        .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_DECODER_CLASS),
+            decoderClass);
+    streamConfigMap.put("stream.kafka.broker.list", bootstrapHosts);
+    if (buffer != null) {
+      streamConfigMap.put("stream.kafka.buffer.size", buffer);
+    }
+    if (socketTimeout != null) {
+      streamConfigMap.put("stream.kafka.socket.timeout", socketTimeout);
+    }
+    if (fetcherSize != null) {
+      streamConfigMap.put("stream.kafka.fetcher.size", fetcherSize);
+    }
+    if (fetcherMinBytes != null) {
+      streamConfigMap.put("stream.kafka.fetcher.minBytes", fetcherMinBytes);
+    }
+    return new Kafka2PartitionLevelStreamConfig(new StreamConfig(streamConfigMap));
+  }
+
+  @Test
+  public void testGetKafkaTopicName() {
+    Kafka2PartitionLevelStreamConfig config = getStreamConfig("topic", "", "", "");
+    Assert.assertEquals("topic", config.getKafkaTopicName());
+  }
+
+  @Test
+  public void testGetBootstrapHosts() {
+    Kafka2PartitionLevelStreamConfig config = getStreamConfig("topic", "host1", "", "");
+    Assert.assertEquals("host1", config.getBootstrapHosts());
+  }
+
+  @Test
+  public void testGetKafkaBufferSize() {
+    // test default
+    Kafka2PartitionLevelStreamConfig config = getStreamConfig("topic", "host1", null, "");
+    Assert.assertEquals(Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE_DEFAULT,
+        config.getKafkaBufferSize());
+
+    config = getStreamConfig("topic", "host1", "", "");
+    Assert.assertEquals(Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE_DEFAULT,
+        config.getKafkaBufferSize());
+
+    config = getStreamConfig("topic", "host1", "bad value", "");
+    Assert.assertEquals(Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE_DEFAULT,
+        config.getKafkaBufferSize());
+
+    // correct config
+    config = getStreamConfig("topic", "host1", "100", "");
+    Assert.assertEquals(100, config.getKafkaBufferSize());
+  }
+
+  @Test
+  public void testGetKafkaSocketTimeout() {
+    // test default
+    Kafka2PartitionLevelStreamConfig config = getStreamConfig("topic", "host1", "", null);
+    Assert.assertEquals(Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_SOCKET_TIMEOUT_DEFAULT,
+        config.getKafkaSocketTimeout());
+
+    config = getStreamConfig("topic", "host1", "", "");
+    Assert.assertEquals(Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_SOCKET_TIMEOUT_DEFAULT,
+        config.getKafkaSocketTimeout());
+
+    config = getStreamConfig("topic", "host1", "", "bad value");
+    Assert.assertEquals(Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_SOCKET_TIMEOUT_DEFAULT,
+        config.getKafkaSocketTimeout());
+
+    // correct config
+    config = getStreamConfig("topic", "host1", "", "100");
+    Assert.assertEquals(100, config.getKafkaSocketTimeout());
+  }
+
+  @Test
+  public void testGetFetcherSize() {
+    // test default
+    Kafka2PartitionLevelStreamConfig config = getStreamConfig("topic", "host1", "", "", "", null);
+    Assert.assertEquals(Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE_DEFAULT,
+        config.getKafkaFetcherSizeBytes());
+
+    config = getStreamConfig("topic", "host1", "100", "", "", null);
+    Assert.assertEquals(100, config.getKafkaFetcherSizeBytes());
+
+    config = getStreamConfig("topic", "host1", "100", "", "bad value", null);
+    Assert.assertEquals(100, config.getKafkaFetcherSizeBytes());
+
+    // correct config
+    config = getStreamConfig("topic", "host1", "100", "", "200", null);
+    Assert.assertEquals(200, config.getKafkaFetcherSizeBytes());
+  }
+
+  @Test
+  public void testGetFetcherMinBytes() {
+    // test default
+    Kafka2PartitionLevelStreamConfig config = getStreamConfig("topic", "host1", "", "", "", null);
+    Assert.assertEquals(Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_FETCHER_MIN_BYTES_DEFAULT,
+        config.getKafkaFetcherMinBytes());
+
+    config = getStreamConfig("topic", "host1", "", "", "", "");
+    Assert.assertEquals(Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_FETCHER_MIN_BYTES_DEFAULT,
+        config.getKafkaFetcherMinBytes());
+
+    config = getStreamConfig("topic", "host1", "", "", "", "bad value");
+    Assert.assertEquals(Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_FETCHER_MIN_BYTES_DEFAULT,
+        config.getKafkaFetcherMinBytes());
+
+    // correct config
+    config = getStreamConfig("topic", "host1", "", "", "", "100");
+    Assert.assertEquals(100, config.getKafkaFetcherMinBytes());
+  }
+}
diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/test/java/org/apache/pinot/core/realtime/impl/kafka2/utils/EmbeddedZooKeeper.java b/pinot-connectors/pinot-connector-kafka-2.0/src/test/java/org/apache/pinot/core/realtime/impl/kafka2/utils/EmbeddedZooKeeper.java
new file mode 100644
index 0000000..47370aa
--- /dev/null
+++ b/pinot-connectors/pinot-connector-kafka-2.0/src/test/java/org/apache/pinot/core/realtime/impl/kafka2/utils/EmbeddedZooKeeper.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.kafka2.utils;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.file.Files;
+import org.apache.commons.io.FileUtils;
+import org.apache.zookeeper.server.NIOServerCnxnFactory;
+import org.apache.zookeeper.server.ZooKeeperServer;
+
+
+public class EmbeddedZooKeeper implements Closeable {
+
+  private static final int TICK_TIME = 500;
+  private final NIOServerCnxnFactory factory;
+  private final ZooKeeperServer zookeeper;
+  private final File tmpDir;
+  private final int port;
+
+  EmbeddedZooKeeper() throws IOException, InterruptedException {
+    this.tmpDir = Files.createTempDirectory(null).toFile();
+    this.factory = new NIOServerCnxnFactory();
+    this.zookeeper = new ZooKeeperServer(new File(tmpDir, "data"), new File(tmpDir, "log"),
+        TICK_TIME);
+    InetSocketAddress addr = new InetSocketAddress("127.0.0.1", 0);
+    factory.configure(addr, 0);
+    factory.startup(zookeeper);
+    this.port = zookeeper.getClientPort();
+  }
+
+  public int getPort() {
+    return port;
+  }
+
+  @Override
+  public void close() throws IOException {
+    zookeeper.shutdown();
+    factory.shutdown();
+    FileUtils.deleteDirectory(tmpDir);
+  }
+}
diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/test/java/org/apache/pinot/core/realtime/impl/kafka2/utils/MiniKafkaCluster.java b/pinot-connectors/pinot-connector-kafka-2.0/src/test/java/org/apache/pinot/core/realtime/impl/kafka2/utils/MiniKafkaCluster.java
new file mode 100644
index 0000000..3ec32fc
--- /dev/null
+++ b/pinot-connectors/pinot-connector-kafka-2.0/src/test/java/org/apache/pinot/core/realtime/impl/kafka2/utils/MiniKafkaCluster.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.kafka2.utils;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import org.apache.commons.io.FileUtils;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.DeleteTopicsResult;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.collection.Seq;
+
+
+public final class MiniKafkaCluster implements Closeable {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(MiniKafkaCluster.class);
+  private final EmbeddedZooKeeper zkServer;
+  private final ArrayList<KafkaServer> kafkaServer;
+  private final Path tempDir;
+  private final AdminClient adminClient;
+
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  private MiniKafkaCluster(List<String> brokerIds)
+      throws IOException, InterruptedException {
+    this.zkServer = new EmbeddedZooKeeper();
+    this.tempDir = Files.createTempDirectory(Paths.get(System.getProperty("java.io.tmpdir")), "mini-kafka-cluster");
+    this.kafkaServer = new ArrayList<>();
+    int port = 0;
+    for (String id : brokerIds) {
+      port = getAvailablePort();
+      KafkaConfig c = new KafkaConfig(createBrokerConfig(id, port));
+      Seq seq =
+          scala.collection.JavaConverters.collectionAsScalaIterableConverter(Collections.emptyList()).asScala().toSeq();
+      kafkaServer.add(new KafkaServer(c, Time.SYSTEM, Option.empty(), seq));
+    }
+    Properties props = new Properties();
+    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:" + port);
+    adminClient = AdminClient.create(props);
+  }
+
+  static int getAvailablePort() {
+    try {
+      try (ServerSocket socket = new ServerSocket(0)) {
+        return socket.getLocalPort();
+      }
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to find available port to use", e);
+    }
+  }
+
+  private Properties createBrokerConfig(String nodeId, int port)
+      throws IOException {
+    Properties props = new Properties();
+    props.put("broker.id", nodeId);
+    props.put("port", Integer.toString(port));
+    props.put("log.dir", Files.createTempDirectory(tempDir, "broker-").toAbsolutePath().toString());
+    props.put("zookeeper.connect", "127.0.0.1:" + zkServer.getPort());
+    props.put("replica.socket.timeout.ms", "1500");
+    props.put("controller.socket.timeout.ms", "1500");
+    props.put("controlled.shutdown.enable", "true");
+    props.put("delete.topic.enable", "true");
+    props.put("auto.create.topics.enable", "true");
+    props.put("offsets.topic.replication.factor", "1");
+    props.put("controlled.shutdown.retry.backoff.ms", "100");
+    props.put("log.cleaner.dedupe.buffer.size", "2097152");
+    return props;
+  }
+
+  public void start() {
+    for (KafkaServer s : kafkaServer) {
+      s.startup();
+    }
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    for (KafkaServer s : kafkaServer) {
+      s.shutdown();
+    }
+    this.zkServer.close();
+    FileUtils.deleteDirectory(tempDir.toFile());
+  }
+
+  public EmbeddedZooKeeper getZkServer() {
+    return zkServer;
+  }
+
+  public List<KafkaServer> getKafkaServer() {
+    return kafkaServer;
+  }
+
+  public int getKafkaServerPort(int index) {
+    return kafkaServer.get(index).socketServer()
+        .boundPort(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT));
+  }
+
+  public AdminClient getAdminClient() {
+    return adminClient;
+  }
+
+  public boolean createTopic(String topicName, int numPartitions, int replicationFactor) {
+    NewTopic newTopic = new NewTopic(topicName, numPartitions, (short) replicationFactor);
+    CreateTopicsResult createTopicsResult = this.adminClient.createTopics(Arrays.asList(newTopic));
+    try {
+      createTopicsResult.all().get();
+    } catch (InterruptedException | ExecutionException e) {
+      LOGGER.error("Failed to create Kafka topic: {}, Exception: {}", newTopic.toString(), e);
+      return false;
+    }
+    return true;
+  }
+
+  public boolean deleteTopic(String topicName) {
+    final DeleteTopicsResult deleteTopicsResult = this.adminClient.deleteTopics(Collections.singletonList(topicName));
+    try {
+      deleteTopicsResult.all().get();
+    } catch (InterruptedException | ExecutionException e) {
+      LOGGER.error("Failed to delete Kafka topic: {}, Exception: {}", topicName, e);
+      return false;
+    }
+    return true;
+  }
+
+  public static class Builder {
+
+    private List<String> brokerIds = new ArrayList<>();
+
+    public Builder newServer(String brokerId) {
+      brokerIds.add(brokerId);
+      return this;
+    }
+
+    public MiniKafkaCluster build()
+        throws IOException, InterruptedException {
+      return new MiniKafkaCluster(brokerIds);
+    }
+  }
+}
\ No newline at end of file
diff --git a/pinot-connectors/pom.xml b/pinot-connectors/pom.xml
index 64d798d..1b45cc1 100644
--- a/pinot-connectors/pom.xml
+++ b/pinot-connectors/pom.xml
@@ -47,11 +47,23 @@
       <groupId>org.apache.pinot</groupId>
       <artifactId>pinot-common</artifactId>
       <scope>provided</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.scala-lang</groupId>
+          <artifactId>scala-library</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <dependency>
       <groupId>org.apache.pinot</groupId>
       <artifactId>pinot-core</artifactId>
       <scope>provided</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.scala-lang</groupId>
+          <artifactId>scala-library</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
 
     <!-- test -->


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org