You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2019/07/03 10:13:07 UTC

[incubator-pinot] 02/02: Adding support for Kafka 2.0

This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch kafka_2.0
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit d9e031618c4c7fa28e64d62858aa7e4a36d6f279
Author: Xiang Fu <xi...@traceable.ai>
AuthorDate: Mon Jul 1 16:17:25 2019 -0700

    Adding support for Kafka 2.0
---
 pinot-connectors/pinot-connector-kafka-0.9/pom.xml |   5 +
 pinot-connectors/pinot-connector-kafka-2.0/pom.xml | 112 +++++---
 ...umerFactory.java => Kafka2ConsumerFactory.java} |  36 +--
 .../impl/kafka2/Kafka2ConsumerManager.java         | 191 ++++++++++++++
 .../impl/kafka2/Kafka2HighLevelStreamConfig.java   | 135 ++++++++++
 .../realtime/impl/kafka2/Kafka2MessageBatch.java   |  61 +++++
 .../Kafka2PartitionLevelConnectionHandler.java     |  67 +++++
 ...Kafka2PartitionLevelPartitionLevelConsumer.java |  65 +++++
 .../kafka2/Kafka2PartitionLevelStreamConfig.java   | 146 +++++++++++
 ...Kafka2PartitionLevelStreamMetadataProvider.java |  67 +++++
 ...ties.java => Kafka2StreamConfigProperties.java} |  32 +--
 .../impl/kafka2/Kafka2StreamLevelConsumer.java     | 166 ++++++++++++
 .../impl/kafka2/KafkaAvroMessageDecoder.java       | 290 +++++++++++++++++++++
 .../impl/kafka2/KafkaConnectionHandler.java        |  61 -----
 .../impl/kafka2/KafkaJSONMessageDecoder.java       |  63 +++++
 .../realtime/impl/kafka2/KafkaMessageBatch.java    |  65 -----
 .../impl/kafka2/KafkaPartitionConsumer.java        |  51 ----
 .../kafka2/KafkaPartitionLevelStreamConfig.java    | 144 ----------
 .../impl/kafka2/KafkaStreamMetadataProvider.java   |  81 ------
 .../realtime/impl/kafka2/MessageAndOffset.java     |  42 +--
 .../kafka2/KafkaPartitionLevelConsumerTest.java    | 232 +++++++++++++++++
 .../KafkaPartitionLevelStreamConfigTest.java       | 161 ++++++++++++
 .../impl/kafka2/utils/EmbeddedZooKeeper.java       |  60 +++++
 .../impl/kafka2/utils/MiniKafkaCluster.java        | 175 +++++++++++++
 pinot-connectors/pom.xml                           |  12 +
 25 files changed, 2024 insertions(+), 496 deletions(-)

diff --git a/pinot-connectors/pinot-connector-kafka-0.9/pom.xml b/pinot-connectors/pinot-connector-kafka-0.9/pom.xml
index ae0317e..852c29c 100644
--- a/pinot-connectors/pinot-connector-kafka-0.9/pom.xml
+++ b/pinot-connectors/pinot-connector-kafka-0.9/pom.xml
@@ -63,5 +63,10 @@
         </exclusion>
       </exclusions>
     </dependency>
+    <dependency>
+      <groupId>org.scala-lang</groupId>
+      <artifactId>scala-library</artifactId>
+      <version>2.10.5</version>
+    </dependency>
   </dependencies>
 </project>
diff --git a/pinot-connectors/pinot-connector-kafka-2.0/pom.xml b/pinot-connectors/pinot-connector-kafka-2.0/pom.xml
index f351219..2a9c155 100644
--- a/pinot-connectors/pinot-connector-kafka-2.0/pom.xml
+++ b/pinot-connectors/pinot-connector-kafka-2.0/pom.xml
@@ -22,46 +22,82 @@
 <project xmlns="http://maven.apache.org/POM/4.0.0"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <parent>
-        <artifactId>pinot-connectors</artifactId>
-        <groupId>org.apache.pinot</groupId>
-        <version>0.2.0-SNAPSHOT</version>
-        <relativePath>..</relativePath>
-    </parent>
-    <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <artifactId>pinot-connectors</artifactId>
+    <groupId>org.apache.pinot</groupId>
+    <version>0.2.0-SNAPSHOT</version>
+    <relativePath>..</relativePath>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+  <artifactId>pinot-connector-kafka-2.0</artifactId>
+  <name>Pinot Connector Kafka 2.0</name>
+  <url>https://pinot.apache.org/</url>
+  <properties>
+    <pinot.root>${basedir}/../..</pinot.root>
+    <kafka.version>2.0.0</kafka.version>
+  </properties>
 
-    <artifactId>pinot-connector-kafka-2.0</artifactId>
+  <dependencies>
 
-    <properties>
-        <pinot.root>${basedir}/../..</pinot.root>
-        <kafka.version>2.0.0</kafka.version>
-    </properties>
+    <!-- Kafka  -->
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-clients</artifactId>
+      <version>${kafka.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>net.sf.jopt-simple</groupId>
+          <artifactId>jopt-simple</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.scala-lang</groupId>
+          <artifactId>scala-library</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.zookeeper</groupId>
+          <artifactId>zookeeper</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
 
-    <dependencies>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka_2.12</artifactId>
+      <version>${kafka.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>net.sf.jopt-simple</groupId>
+          <artifactId>jopt-simple</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.scala-lang</groupId>
+          <artifactId>scala-library</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.scala-lang</groupId>
+          <artifactId>scala-reflect</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.zookeeper</groupId>
+          <artifactId>zookeeper</artifactId>
+        </exclusion>
+      </exclusions>
+      <scope>test</scope>
+    </dependency>
 
-        <!-- Kafka  -->
-        <dependency>
-            <groupId>org.apache.kafka</groupId>
-            <artifactId>kafka-clients</artifactId>
-            <version>${kafka.version}</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>slf4j-log4j12</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>net.sf.jopt-simple</groupId>
-                    <artifactId>jopt-simple</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.scala-lang</groupId>
-                    <artifactId>scala-library</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.zookeeper</groupId>
-                    <artifactId>zookeeper</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-    </dependencies>
+    <dependency>
+      <groupId>org.scala-lang</groupId>
+      <artifactId>scala-library</artifactId>
+      <version>2.12.8</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
 </project>
\ No newline at end of file
diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaConsumerFactory.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2ConsumerFactory.java
similarity index 57%
rename from pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaConsumerFactory.java
rename to pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2ConsumerFactory.java
index cc3d8a6..3eab517 100644
--- a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaConsumerFactory.java
+++ b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2ConsumerFactory.java
@@ -26,24 +26,26 @@ import org.apache.pinot.core.realtime.stream.StreamConsumerFactory;
 import org.apache.pinot.core.realtime.stream.StreamLevelConsumer;
 import org.apache.pinot.core.realtime.stream.StreamMetadataProvider;
 
-public class KafkaConsumerFactory extends StreamConsumerFactory {
-    @Override
-    public PartitionLevelConsumer createPartitionLevelConsumer(String clientId, int partition) {
-        return new KafkaPartitionConsumer(_streamConfig, partition);
-    }
 
-    @Override
-    public StreamLevelConsumer createStreamLevelConsumer(String clientId, String tableName, Schema schema, InstanceZKMetadata instanceZKMetadata, ServerMetrics serverMetrics) {
-        throw new UnsupportedOperationException("High level consumer not supported in kafka 2. Use Kafka partition level consumers");
-    }
+public class Kafka2ConsumerFactory extends StreamConsumerFactory {
+  @Override
+  public PartitionLevelConsumer createPartitionLevelConsumer(String clientId, int partition) {
+    return new Kafka2PartitionLevelPartitionLevelConsumer(clientId, _streamConfig, partition);
+  }
 
-    @Override
-    public StreamMetadataProvider createPartitionMetadataProvider(String clientId, int partition) {
-        return null;
-    }
+  @Override
+  public StreamLevelConsumer createStreamLevelConsumer(String clientId, String tableName, Schema schema,
+      InstanceZKMetadata instanceZKMetadata, ServerMetrics serverMetrics) {
+    return new Kafka2StreamLevelConsumer(clientId, tableName, _streamConfig, schema, instanceZKMetadata, serverMetrics);
+  }
 
-    @Override
-    public StreamMetadataProvider createStreamMetadataProvider(String clientId) {
-        throw new UnsupportedOperationException("High level consumer not supported in kafka 2. Use Kafka partition level consumers");
-    }
+  @Override
+  public StreamMetadataProvider createPartitionMetadataProvider(String clientId, int partition) {
+    return new Kafka2PartitionLevelStreamMetadataProvider(clientId, _streamConfig, partition);
+  }
+
+  @Override
+  public StreamMetadataProvider createStreamMetadataProvider(String clientId) {
+    return new Kafka2PartitionLevelStreamMetadataProvider(clientId, _streamConfig);
+  }
 }
diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2ConsumerManager.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2ConsumerManager.java
new file mode 100644
index 0000000..74e3ee2
--- /dev/null
+++ b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2ConsumerManager.java
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.kafka2;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.IdentityHashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.tuple.ImmutableTriple;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.BytesDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Manager for Kafka consumers that reuses consumers and delays their shutdown.
+ *
+ * This is a workaround for the current realtime design flaw where any issue while flushing/committing offsets causes
+ * duplicate or dropped events. Kafka consumption is driven by the controller, which assigns a realtime segment to the
+ * servers; when a server is assigned a new realtime segment, it creates a Kafka consumer, consumes until it reaches a
+ * threshold then flushes to disk, writes metadata to helix indicating the segment is completed, commits Kafka offsets
+ * to ZK and then shuts down the consumer. The controller notices the metadata write and reassigns a segment to the
+ * server, so that it can keep on consuming.
+ *
+ * This logic is flawed if committing Kafka offsets fails, at which time the committed state is unknown. The proper fix
+ * would be to just keep on using that consumer and try committing our offsets later, but we recreate a new Kafka
+ * consumer whenever we get a new segment and also keep the old consumer around, leading to half the events being
+ * assigned, due to Kafka rebalancing the partitions between the two consumers (one of which is not actually reading
+ * anything anymore). Because that logic is stateless and driven by Helix, there's no real clean way to keep the
+ * consumer alive and pass it to the next segment.
+ *
+ * This class and long comment is to work around this issue by keeping the consumer alive for a little bit instead of
+ * shutting it down immediately, so that the next segment assignment can pick up the same consumer. This way, even if
+ * committing the offsets fails, we can still pick up the same consumer the next time we get a segment assigned to us
+ * by the controller and hopefully commit our offsets the next time we flush to disk.
+ *
+ * This temporary code should be completely removed by the time we redesign the consumption to use the lower level
+ * Kafka APIs.
+ */
+public class Kafka2ConsumerManager {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(Kafka2ConsumerManager.class);
+  private static final Long IN_USE = -1L;
+  private static final long CONSUMER_SHUTDOWN_DELAY_MILLIS = TimeUnit.SECONDS.toMillis(60); // One minute
+  private static final Map<ImmutableTriple<String, String, String>, KafkaConsumer> CONSUMER_FOR_CONFIG_KEY =
+      new HashMap<>();
+  private static final IdentityHashMap<KafkaConsumer, Long> CONSUMER_RELEASE_TIME = new IdentityHashMap<>();
+
+  public static KafkaConsumer acquireKafkaConsumerForConfig(Kafka2HighLevelStreamConfig kafka2HighLevelStreamConfig) {
+    final ImmutableTriple<String, String, String> configKey =
+        new ImmutableTriple<>(kafka2HighLevelStreamConfig.getKafkaTopicName(), kafka2HighLevelStreamConfig.getGroupId(),
+            kafka2HighLevelStreamConfig.getBootstrapServers());
+
+    synchronized (Kafka2ConsumerManager.class) {
+      // If we have the consumer and it's not already acquired, return it, otherwise error out if it's already acquired
+      if (CONSUMER_FOR_CONFIG_KEY.containsKey(configKey)) {
+        KafkaConsumer kafkaConsumer = CONSUMER_FOR_CONFIG_KEY.get(configKey);
+        if (CONSUMER_RELEASE_TIME.get(kafkaConsumer).equals(IN_USE)) {
+          throw new RuntimeException("Consumer/iterator " + kafkaConsumer + " already in use!");
+        } else {
+          LOGGER.info("Reusing kafka consumer/iterator with id {}", kafkaConsumer);
+          CONSUMER_RELEASE_TIME.put(kafkaConsumer, IN_USE);
+          return kafkaConsumer;
+        }
+      }
+
+      LOGGER.info("Creating new kafka consumer and iterator for topic {}",
+          kafka2HighLevelStreamConfig.getKafkaTopicName());
+
+      // Create the consumer
+
+      Properties consumerProp = new Properties();
+      consumerProp.putAll(kafka2HighLevelStreamConfig.getKafkaConsumerProperties());
+      consumerProp.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka2HighLevelStreamConfig.getBootstrapServers());
+      consumerProp.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+      consumerProp.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class.getName());
+      KafkaConsumer consumer = new KafkaConsumer<>(consumerProp);
+      consumer.subscribe(Collections.singletonList(kafka2HighLevelStreamConfig.getKafkaTopicName()));
+
+      // Mark both the consumer and iterator as acquired
+      CONSUMER_FOR_CONFIG_KEY.put(configKey, consumer);
+      CONSUMER_RELEASE_TIME.put(consumer, IN_USE);
+
+      LOGGER.info("Created consumer/iterator with id {} for topic {}", consumer,
+          kafka2HighLevelStreamConfig.getKafkaTopicName());
+
+      return consumer;
+    }
+  }
+
+  public static void releaseKafkaConsumer(final KafkaConsumer kafkaConsumer) {
+    synchronized (Kafka2ConsumerManager.class) {
+      // Release the consumer, mark it for shutdown in the future
+      final long releaseTime = System.currentTimeMillis() + CONSUMER_SHUTDOWN_DELAY_MILLIS;
+      CONSUMER_RELEASE_TIME.put(kafkaConsumer, releaseTime);
+
+      LOGGER.info("Marking consumer/iterator with id {} for release at {}", kafkaConsumer, releaseTime);
+
+      // Schedule the shutdown of the consumer
+      new Thread() {
+        @Override
+        public void run() {
+          try {
+            // Await the shutdown time
+            Uninterruptibles.sleepUninterruptibly(CONSUMER_SHUTDOWN_DELAY_MILLIS, TimeUnit.MILLISECONDS);
+
+            // Shutdown all consumers that have not been re-acquired
+            synchronized (Kafka2ConsumerManager.class) {
+              LOGGER.info("Executing release check for consumer/iterator {} at {}, scheduled at ", kafkaConsumer,
+                  System.currentTimeMillis(), releaseTime);
+
+              Iterator<Map.Entry<ImmutableTriple<String, String, String>, KafkaConsumer>> configIterator =
+                  CONSUMER_FOR_CONFIG_KEY.entrySet().iterator();
+
+              while (configIterator.hasNext()) {
+                Map.Entry<ImmutableTriple<String, String, String>, KafkaConsumer> entry = configIterator.next();
+                KafkaConsumer kafkaConsumer = entry.getValue();
+
+                final Long releaseTime = CONSUMER_RELEASE_TIME.get(kafkaConsumer);
+                if (!releaseTime.equals(IN_USE) && releaseTime < System.currentTimeMillis()) {
+                  LOGGER.info("Releasing consumer/iterator {}", kafkaConsumer);
+
+                  try {
+                    kafkaConsumer.close();
+                  } catch (Exception e) {
+                    LOGGER.warn("Caught exception while shutting down Kafka consumer with id {}", kafkaConsumer, e);
+                  }
+
+                  configIterator.remove();
+                  CONSUMER_RELEASE_TIME.remove(kafkaConsumer);
+                } else {
+                  LOGGER.info("Not releasing consumer/iterator {}, it has been reacquired", kafkaConsumer);
+                }
+              }
+            }
+          } catch (Exception e) {
+            LOGGER.warn("Caught exception in release of consumer/iterator {}", e, kafkaConsumer);
+          }
+        }
+      }.start();
+    }
+  }
+
+  public static void closeAllConsumers() {
+    try {
+      // Shutdown all consumers
+      synchronized (Kafka2ConsumerManager.class) {
+        LOGGER.info("Trying to shutdown all the kafka consumers");
+        Iterator<KafkaConsumer> consumerIterator = CONSUMER_FOR_CONFIG_KEY.values().iterator();
+
+        while (consumerIterator.hasNext()) {
+          KafkaConsumer kafkaConsumer = consumerIterator.next();
+          LOGGER.info("Trying to shutdown consumer/iterator {}", kafkaConsumer);
+          try {
+            kafkaConsumer.close();
+          } catch (Exception e) {
+            LOGGER.warn("Caught exception while shutting down Kafka consumer with id {}", kafkaConsumer, e);
+          }
+          consumerIterator.remove();
+        }
+        CONSUMER_FOR_CONFIG_KEY.clear();
+        CONSUMER_RELEASE_TIME.clear();
+      }
+    } catch (Exception e) {
+      LOGGER.warn("Caught exception during shutting down all kafka consumers", e);
+    }
+  }
+}
diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2HighLevelStreamConfig.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2HighLevelStreamConfig.java
new file mode 100644
index 0000000..f866288
--- /dev/null
+++ b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2HighLevelStreamConfig.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.kafka2;
+
+import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
+import org.apache.pinot.common.utils.EqualityUtils;
+import org.apache.pinot.core.realtime.stream.StreamConfig;
+import org.apache.pinot.core.realtime.stream.StreamConfigProperties;
+
+
+/**
+ * Wrapper around {@link StreamConfig} for use in the {@link Kafka2StreamLevelConsumer}
+ */
+public class Kafka2HighLevelStreamConfig {
+  private static final String DEFAULT_AUTO_COMMIT_ENABLE = "false";
+
+  private static final Map<String, String> defaultProps;
+  private String _kafkaTopicName;
+  private String _groupId;
+  private String _bootstrapServers;
+  private Map<String, String> _kafkaConsumerProperties;
+
+  /**
+   * Builds a wrapper around {@link StreamConfig} to fetch kafka stream level consumer specific configs
+   * @param streamConfig
+   * @param tableName
+   * @param instanceZKMetadata
+   */
+  public Kafka2HighLevelStreamConfig(StreamConfig streamConfig, String tableName,
+      InstanceZKMetadata instanceZKMetadata) {
+    Map<String, String> streamConfigMap = streamConfig.getStreamConfigsMap();
+
+    _kafkaTopicName = streamConfig.getTopicName();
+    String hlcBootstrapBrokerUrlKey = Kafka2StreamConfigProperties
+        .constructStreamProperty(Kafka2StreamConfigProperties.HighLevelConsumer.KAFKA_HLC_BOOTSTRAP_SERVER);
+    _bootstrapServers = streamConfigMap.get(hlcBootstrapBrokerUrlKey);
+    Preconditions.checkNotNull(_bootstrapServers,
+        "Must specify bootstrap broker connect string " + hlcBootstrapBrokerUrlKey + " in high level kafka consumer");
+    _groupId = instanceZKMetadata.getGroupId(tableName);
+
+    _kafkaConsumerProperties = new HashMap<>();
+    String kafkaConsumerPropertyPrefix =
+        Kafka2StreamConfigProperties.constructStreamProperty(Kafka2StreamConfigProperties.KAFKA_CONSUMER_PROP_PREFIX);
+    for (String key : streamConfigMap.keySet()) {
+      if (key.startsWith(kafkaConsumerPropertyPrefix)) {
+        _kafkaConsumerProperties
+            .put(StreamConfigProperties.getPropertySuffix(key, kafkaConsumerPropertyPrefix), streamConfigMap.get(key));
+      }
+    }
+  }
+
+  public String getKafkaTopicName() {
+    return _kafkaTopicName;
+  }
+
+  public String getGroupId() {
+    return _groupId;
+  }
+
+  public Properties getKafkaConsumerProperties() {
+    Properties props = new Properties();
+    for (String key : defaultProps.keySet()) {
+      props.put(key, defaultProps.get(key));
+    }
+    for (String key : _kafkaConsumerProperties.keySet()) {
+      props.put(key, _kafkaConsumerProperties.get(key));
+    }
+    props.put("group.id", _groupId);
+    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, _bootstrapServers);
+    return props;
+  }
+
+  @Override
+  public String toString() {
+    return "Kafka2HighLevelStreamConfig{" + "_kafkaTopicName='" + _kafkaTopicName + '\'' + ", _groupId='" + _groupId
+        + '\'' + ", _bootstrapServers='" + _bootstrapServers + '\'' + ", _kafkaConsumerProperties="
+        + _kafkaConsumerProperties + '}';
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (EqualityUtils.isSameReference(this, o)) {
+      return true;
+    }
+
+    if (EqualityUtils.isNullOrNotSameClass(this, o)) {
+      return false;
+    }
+
+    Kafka2HighLevelStreamConfig that = (Kafka2HighLevelStreamConfig) o;
+
+    return EqualityUtils.isEqual(_kafkaTopicName, that._kafkaTopicName) && EqualityUtils
+        .isEqual(_groupId, that._groupId) && EqualityUtils.isEqual(_bootstrapServers, that._bootstrapServers)
+        && EqualityUtils.isEqual(_kafkaConsumerProperties, that._kafkaConsumerProperties);
+  }
+
+  @Override
+  public int hashCode() {
+    int result = EqualityUtils.hashCodeOf(_kafkaTopicName);
+    result = EqualityUtils.hashCodeOf(result, _groupId);
+    result = EqualityUtils.hashCodeOf(result, _bootstrapServers);
+    result = EqualityUtils.hashCodeOf(result, _kafkaConsumerProperties);
+    return result;
+  }
+
+  public String getBootstrapServers() {
+    return _bootstrapServers;
+  }
+
+  static {
+    defaultProps = new HashMap<>();
+    defaultProps.put(Kafka2StreamConfigProperties.HighLevelConsumer.AUTO_COMMIT_ENABLE, DEFAULT_AUTO_COMMIT_ENABLE);
+  }
+}
diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2MessageBatch.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2MessageBatch.java
new file mode 100644
index 0000000..13bd41b
--- /dev/null
+++ b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2MessageBatch.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.kafka2;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.pinot.core.realtime.stream.MessageBatch;
+
+
+public class Kafka2MessageBatch implements MessageBatch<byte[]> {
+
+  private List<MessageAndOffset> messageList = new ArrayList<>();
+
+  public Kafka2MessageBatch(Iterable<ConsumerRecord<String, byte[]>> iterable) {
+    for (ConsumerRecord<String, byte[]> record : iterable) {
+      messageList.add(new MessageAndOffset(record.value(), record.offset()));
+    }
+  }
+
+  @Override
+  public int getMessageCount() {
+    return messageList.size();
+  }
+
+  @Override
+  public byte[] getMessageAtIndex(int index) {
+    return messageList.get(index).getMessage().array();
+  }
+
+  @Override
+  public int getMessageOffsetAtIndex(int index) {
+    return messageList.get(index).getMessage().arrayOffset();
+  }
+
+  @Override
+  public int getMessageLengthAtIndex(int index) {
+    return messageList.get(index).payloadSize();
+  }
+
+  @Override
+  public long getNextStreamMessageOffsetAtIndex(int index) {
+    return messageList.get(index).getNextOffset();
+  }
+}
diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2PartitionLevelConnectionHandler.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2PartitionLevelConnectionHandler.java
new file mode 100644
index 0000000..3f2550d
--- /dev/null
+++ b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2PartitionLevelConnectionHandler.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.kafka2;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Properties;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.BytesDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.pinot.core.realtime.stream.StreamConfig;
+
+
+public abstract class Kafka2PartitionLevelConnectionHandler {
+
+  protected final Kafka2PartitionLevelStreamConfig _config;
+  protected final String _clientId;
+  protected final int _partition;
+  protected final String _topic;
+  protected final Consumer<String, byte[]> _consumer;
+  protected final TopicPartition _topicPartition;
+
+  public Kafka2PartitionLevelConnectionHandler(String clientId, StreamConfig streamConfig, int partition) {
+    _config = new Kafka2PartitionLevelStreamConfig(streamConfig);
+    _clientId = clientId;
+    _partition = partition;
+    _topic = _config.getKafkaTopicName();
+    Properties consumerProp = new Properties();
+    consumerProp.putAll(streamConfig.getStreamConfigsMap());
+    consumerProp.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, _config.getBootstrapHosts());
+    consumerProp.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+    consumerProp.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class.getName());
+    _consumer = new KafkaConsumer<>(consumerProp);
+    _topicPartition = new TopicPartition(_topic, _partition);
+    _consumer.assign(Collections.singletonList(_topicPartition));
+  }
+
+  public void close()
+      throws IOException {
+    _consumer.close();
+  }
+
+  @VisibleForTesting
+  protected Kafka2PartitionLevelStreamConfig getKafka2PartitionLevelStreamConfig() {
+    return _config;
+  }
+}
diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2PartitionLevelPartitionLevelConsumer.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2PartitionLevelPartitionLevelConsumer.java
new file mode 100644
index 0000000..19f520a
--- /dev/null
+++ b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2PartitionLevelPartitionLevelConsumer.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.kafka2;
+
+import com.google.common.collect.Iterables;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.pinot.core.realtime.stream.MessageBatch;
+import org.apache.pinot.core.realtime.stream.PartitionLevelConsumer;
+import org.apache.pinot.core.realtime.stream.StreamConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class Kafka2PartitionLevelPartitionLevelConsumer extends Kafka2PartitionLevelConnectionHandler implements PartitionLevelConsumer {
+  private static final Logger LOGGER = LoggerFactory.getLogger(Kafka2PartitionLevelPartitionLevelConsumer.class);
+
+  public Kafka2PartitionLevelPartitionLevelConsumer(String clientId, StreamConfig streamConfig, int partition) {
+    super(clientId, streamConfig, partition);
+  }
+
+  @Override
+  public MessageBatch fetchMessages(long startOffset, long endOffset, int timeoutMillis)
+      throws TimeoutException {
+    _consumer.seek(_topicPartition, startOffset);
+    ConsumerRecords<String, byte[]> consumerRecords = _consumer.poll(Duration.ofMillis(timeoutMillis));
+    final Iterable<ConsumerRecord<String, byte[]>> messageAndOffsetIterable =
+        buildOffsetFilteringIterable(consumerRecords.records(_topicPartition), startOffset, endOffset);
+    return new Kafka2MessageBatch(messageAndOffsetIterable);
+  }
+
+  private Iterable<ConsumerRecord<String, byte[]>> buildOffsetFilteringIterable(
+      final List<ConsumerRecord<String, byte[]>> messageAndOffsets, final long startOffset, final long endOffset) {
+    return Iterables.filter(messageAndOffsets, input -> {
+      // Filter messages that are either null or have an offset ∉ [startOffset, endOffset]
+      return input != null && input.offset() >= startOffset && (endOffset > input.offset() || endOffset == -1);
+    });
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    super.close();
+  }
+}
diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2PartitionLevelStreamConfig.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2PartitionLevelStreamConfig.java
new file mode 100644
index 0000000..fcc0e04
--- /dev/null
+++ b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2PartitionLevelStreamConfig.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.kafka2;
+
+import com.google.common.base.Preconditions;
+import java.util.Map;
+import org.apache.commons.lang.StringUtils;
+import org.apache.pinot.common.utils.EqualityUtils;
+import org.apache.pinot.core.realtime.stream.StreamConfig;
+
+
+/**
+ * Wrapper around {@link StreamConfig} for use in {@link Kafka2PartitionLevelPartitionLevelConsumer}
+ */
+public class Kafka2PartitionLevelStreamConfig {
+
+  private final String _kafkaTopicName;
+  private final String _bootstrapHosts;
+  private final int _kafkaBufferSize;
+  private final int _kafkaSocketTimeout;
+  private final int _kafkaFetcherSizeBytes;
+  private final int _kafkaFetcherMinBytes;
+  private final Map<String, String> _streamConfigMap;
+
+  /**
+   * Builds a wrapper around {@link StreamConfig} to fetch kafka partition level consumer related configs
+   * @param streamConfig
+   */
+  public Kafka2PartitionLevelStreamConfig(StreamConfig streamConfig) {
+    _streamConfigMap = streamConfig.getStreamConfigsMap();
+
+    _kafkaTopicName = streamConfig.getTopicName();
+
+    String llcBrokerListKey = Kafka2StreamConfigProperties
+        .constructStreamProperty(Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_BROKER_LIST);
+    String llcBufferKey = Kafka2StreamConfigProperties
+        .constructStreamProperty(Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE);
+    String llcTimeoutKey = Kafka2StreamConfigProperties
+        .constructStreamProperty(Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_SOCKET_TIMEOUT);
+    String fetcherSizeKey = Kafka2StreamConfigProperties
+        .constructStreamProperty(Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_FETCHER_SIZE_BYTES);
+    String fetcherMinBytesKey = Kafka2StreamConfigProperties
+        .constructStreamProperty(Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_FETCHER_MIN_BYTES);
+    _bootstrapHosts = _streamConfigMap.get(llcBrokerListKey);
+    _kafkaBufferSize = getIntConfigWithDefault(_streamConfigMap, llcBufferKey,
+        Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE_DEFAULT);
+    _kafkaSocketTimeout = getIntConfigWithDefault(_streamConfigMap, llcTimeoutKey,
+        Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_SOCKET_TIMEOUT_DEFAULT);
+    _kafkaFetcherSizeBytes = getIntConfigWithDefault(_streamConfigMap, fetcherSizeKey, _kafkaBufferSize);
+    _kafkaFetcherMinBytes = getIntConfigWithDefault(_streamConfigMap, fetcherMinBytesKey,
+        Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_FETCHER_MIN_BYTES_DEFAULT);
+    Preconditions.checkNotNull(_bootstrapHosts,
+        "Must specify kafka brokers list " + llcBrokerListKey + " in case of low level kafka consumer");
+  }
+
+  public String getKafkaTopicName() {
+    return _kafkaTopicName;
+  }
+
+  public String getBootstrapHosts() {
+    return _bootstrapHosts;
+  }
+
+  public int getKafkaBufferSize() {
+    return _kafkaBufferSize;
+  }
+
+  public int getKafkaSocketTimeout() {
+    return _kafkaSocketTimeout;
+  }
+
+  public int getKafkaFetcherSizeBytes() {
+    return _kafkaFetcherSizeBytes;
+  }
+
+  public int getKafkaFetcherMinBytes() {
+    return _kafkaFetcherMinBytes;
+  }
+
+  private int getIntConfigWithDefault(Map<String, String> configMap, String key, int defaultValue) {
+    String stringValue = configMap.get(key);
+    try {
+      if (StringUtils.isNotEmpty(stringValue)) {
+        return Integer.parseInt(stringValue);
+      }
+      return defaultValue;
+    } catch (NumberFormatException ex) {
+      return defaultValue;
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "KafkaLowLevelStreamConfig{" + "_kafkaTopicName='" + _kafkaTopicName + '\'' + ", _bootstrapHosts='"
+        + _bootstrapHosts + '\'' + ", _kafkaBufferSize='" + _kafkaBufferSize + '\'' + ", _kafkaSocketTimeout='"
+        + _kafkaSocketTimeout + '\'' + ", _kafkaFetcherSizeBytes='" + _kafkaFetcherSizeBytes + '\''
+        + ", _kafkaFetcherMinBytes='" + _kafkaFetcherMinBytes + '\'' + '}';
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (EqualityUtils.isSameReference(this, o)) {
+      return true;
+    }
+
+    if (EqualityUtils.isNullOrNotSameClass(this, o)) {
+      return false;
+    }
+
+    Kafka2PartitionLevelStreamConfig that = (Kafka2PartitionLevelStreamConfig) o;
+
+    return EqualityUtils.isEqual(_kafkaTopicName, that._kafkaTopicName) && EqualityUtils
+        .isEqual(_bootstrapHosts, that._bootstrapHosts) && EqualityUtils
+        .isEqual(_kafkaBufferSize, that._kafkaBufferSize) && EqualityUtils
+        .isEqual(_kafkaSocketTimeout, that._kafkaSocketTimeout) && EqualityUtils
+        .isEqual(_kafkaFetcherSizeBytes, that._kafkaFetcherSizeBytes) && EqualityUtils
+        .isEqual(_kafkaFetcherMinBytes, that._kafkaFetcherMinBytes);
+  }
+
+  @Override
+  public int hashCode() {
+    int result = EqualityUtils.hashCodeOf(_kafkaTopicName);
+    result = EqualityUtils.hashCodeOf(result, _bootstrapHosts);
+    result = EqualityUtils.hashCodeOf(result, _kafkaBufferSize);
+    result = EqualityUtils.hashCodeOf(result, _kafkaSocketTimeout);
+    result = EqualityUtils.hashCodeOf(result, _kafkaFetcherSizeBytes);
+    result = EqualityUtils.hashCodeOf(result, _kafkaFetcherMinBytes);
+    return result;
+  }
+}
diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2PartitionLevelStreamMetadataProvider.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2PartitionLevelStreamMetadataProvider.java
new file mode 100644
index 0000000..7a0558d
--- /dev/null
+++ b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2PartitionLevelStreamMetadataProvider.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.kafka2;
+
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.concurrent.TimeoutException;
+import javax.annotation.Nonnull;
+import org.apache.pinot.core.realtime.stream.OffsetCriteria;
+import org.apache.pinot.core.realtime.stream.StreamConfig;
+import org.apache.pinot.core.realtime.stream.StreamMetadataProvider;
+
+
+public class Kafka2PartitionLevelStreamMetadataProvider extends Kafka2PartitionLevelConnectionHandler implements StreamMetadataProvider {
+
+  public Kafka2PartitionLevelStreamMetadataProvider(String clientId, StreamConfig streamConfig) {
+    this(clientId, streamConfig, Integer.MIN_VALUE);
+  }
+
+  public Kafka2PartitionLevelStreamMetadataProvider(String clientId, StreamConfig streamConfig, int partition) {
+    super(clientId, streamConfig, partition);
+  }
+
+  @Override
+  public int fetchPartitionCount(long timeoutMillis) {
+    return _consumer.partitionsFor(_topic, Duration.ofMillis(timeoutMillis)).size();
+  }
+
+  @Override
+  public long fetchPartitionOffset(@Nonnull OffsetCriteria offsetCriteria, long timeoutMillis)
+      throws TimeoutException {
+    Preconditions.checkNotNull(offsetCriteria);
+    if (offsetCriteria.isLargest()) {
+      return _consumer.endOffsets(Collections.singletonList(_topicPartition), Duration.ofMillis(timeoutMillis))
+          .get(_topicPartition);
+    } else if (offsetCriteria.isSmallest()) {
+      return _consumer.beginningOffsets(Collections.singletonList(_topicPartition), Duration.ofMillis(timeoutMillis))
+          .get(_topicPartition);
+    } else {
+      throw new IllegalArgumentException("Unknown initial offset value " + offsetCriteria.toString());
+    }
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    super.close();
+  }
+}
diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaStreamConfigProperties.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2StreamConfigProperties.java
similarity index 76%
rename from pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaStreamConfigProperties.java
rename to pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2StreamConfigProperties.java
index 3c45d6e..ed27dfc 100644
--- a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaStreamConfigProperties.java
+++ b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2StreamConfigProperties.java
@@ -25,19 +25,22 @@ import org.apache.pinot.core.realtime.stream.StreamConfigProperties;
 /**
  * Property key definitions for all kafka stream related properties
  */
-public class KafkaStreamConfigProperties {
+public class Kafka2StreamConfigProperties {
   public static final String DOT_SEPARATOR = ".";
-  public static final String STREAM_TYPE = "kafka";
+  public static final String KAFKA_CONSUMER_PROP_PREFIX = "kafka.consumer.prop";
+
+  /**
+   * Helper method to create a property string for kafka stream
+   * @param property
+   * @return
+   */
+  public static String constructStreamProperty(String property) {
+    return Joiner.on(DOT_SEPARATOR).join(StreamConfigProperties.STREAM_PREFIX, property);
+  }
 
   public static class HighLevelConsumer {
-    public static final String KAFKA_HLC_ZK_CONNECTION_STRING = "kafka.hlc.zk.connect.string";
-    public static final String ZK_SESSION_TIMEOUT_MS = "zookeeper.session.timeout.ms";
-    public static final String ZK_CONNECTION_TIMEOUT_MS = "zookeeper.connection.timeout.ms";
-    public static final String ZK_SYNC_TIME_MS = "zookeeper.sync.time.ms";
-    public static final String REBALANCE_MAX_RETRIES = "rebalance.max.retries";
-    public static final String REBALANCE_BACKOFF_MS = "rebalance.backoff.ms";
+    public static final String KAFKA_HLC_BOOTSTRAP_SERVER = "kafka.hlc.bootstrap.server";
     public static final String AUTO_COMMIT_ENABLE = "auto.commit.enable";
-    public static final String AUTO_OFFSET_RESET = "auto.offset.reset";
   }
 
   public static class LowLevelConsumer {
@@ -50,16 +53,5 @@ public class KafkaStreamConfigProperties {
     public static final String KAFKA_FETCHER_MIN_BYTES = "kafka.fetcher.minBytes";
     public static final int KAFKA_FETCHER_MIN_BYTES_DEFAULT = 100000;
   }
-
-  public static final String KAFKA_CONSUMER_PROP_PREFIX = "kafka.consumer.prop";
-
-  /**
-   * Helper method to create a property string for kafka stream
-   * @param property
-   * @return
-   */
-  public static String constructStreamProperty(String property) {
-    return Joiner.on(DOT_SEPARATOR).join(StreamConfigProperties.STREAM_PREFIX, property);
-  }
 }
 
diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2StreamLevelConsumer.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2StreamLevelConsumer.java
new file mode 100644
index 0000000..4bbf975
--- /dev/null
+++ b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2StreamLevelConsumer.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.kafka2;
+
+import com.yammer.metrics.core.Meter;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.pinot.common.data.Schema;
+import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
+import org.apache.pinot.common.metrics.ServerMeter;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.core.data.GenericRow;
+import org.apache.pinot.core.realtime.stream.StreamConfig;
+import org.apache.pinot.core.realtime.stream.StreamDecoderProvider;
+import org.apache.pinot.core.realtime.stream.StreamLevelConsumer;
+import org.apache.pinot.core.realtime.stream.StreamMessageDecoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * An implementation of a {@link StreamLevelConsumer} which consumes from the kafka stream
+ */
+public class Kafka2StreamLevelConsumer implements StreamLevelConsumer {
+
+  private StreamMessageDecoder _messageDecoder;
+  private Logger INSTANCE_LOGGER;
+
+  private String _clientId;
+  private String _tableAndStreamName;
+
+  private StreamConfig _streamConfig;
+  private Kafka2HighLevelStreamConfig _kafka2HighLevelStreamConfig;
+
+  private KafkaConsumer<byte[], byte[]> consumer;
+  private ConsumerRecords<byte[], byte[]> consumerRecords;
+  private Iterator<ConsumerRecord<byte[], byte[]>> kafkaIterator;
+  private Map<Integer, Long> consumerOffsets = new HashMap<>(); // tracking current consumed records offsets.
+
+  private long lastLogTime = 0;
+  private long lastCount = 0;
+  private long currentCount = 0L;
+
+  private ServerMetrics _serverMetrics;
+  private Meter tableAndStreamRowsConsumed = null;
+  private Meter tableRowsConsumed = null;
+
+  public Kafka2StreamLevelConsumer(String clientId, String tableName, StreamConfig streamConfig, Schema schema,
+      InstanceZKMetadata instanceZKMetadata, ServerMetrics serverMetrics) {
+    _clientId = clientId;
+    _streamConfig = streamConfig;
+    _kafka2HighLevelStreamConfig = new Kafka2HighLevelStreamConfig(streamConfig, tableName, instanceZKMetadata);
+    _serverMetrics = serverMetrics;
+
+    _messageDecoder = StreamDecoderProvider.create(streamConfig, schema);
+
+    _tableAndStreamName = tableName + "-" + streamConfig.getTopicName();
+    INSTANCE_LOGGER = LoggerFactory
+        .getLogger(Kafka2StreamLevelConsumer.class.getName() + "_" + tableName + "_" + streamConfig.getTopicName());
+  }
+
+  @Override
+  public void start()
+      throws Exception {
+    consumer = Kafka2ConsumerManager.acquireKafkaConsumerForConfig(_kafka2HighLevelStreamConfig);
+  }
+
+  private void updateKafkaIterator() {
+    consumerRecords = consumer.poll(Duration.ofMillis(_streamConfig.getFetchTimeoutMillis()));
+    kafkaIterator = consumerRecords.iterator();
+  }
+
+  @Override
+  public GenericRow next(GenericRow destination) {
+    if (!kafkaIterator.hasNext()) {
+      updateKafkaIterator();
+    }
+    if (kafkaIterator.hasNext()) {
+      try {
+        final ConsumerRecord<byte[], byte[]> record = kafkaIterator.next();
+        updateOffsets(record.partition(), record.offset());
+        destination = _messageDecoder.decode(record.value(), destination);
+        tableAndStreamRowsConsumed = _serverMetrics
+            .addMeteredTableValue(_tableAndStreamName, ServerMeter.REALTIME_ROWS_CONSUMED, 1L,
+                tableAndStreamRowsConsumed);
+        tableRowsConsumed =
+            _serverMetrics.addMeteredGlobalValue(ServerMeter.REALTIME_ROWS_CONSUMED, 1L, tableRowsConsumed);
+
+        ++currentCount;
+
+        final long now = System.currentTimeMillis();
+        // Log every minute or 100k events
+        if (now - lastLogTime > 60000 || currentCount - lastCount >= 100000) {
+          if (lastCount == 0) {
+            INSTANCE_LOGGER.info("Consumed {} events from kafka stream {}", currentCount, _streamConfig.getTopicName());
+          } else {
+            INSTANCE_LOGGER.info("Consumed {} events from kafka stream {} (rate:{}/s)", currentCount - lastCount,
+                _streamConfig.getTopicName(), (float) (currentCount - lastCount) * 1000 / (now - lastLogTime));
+          }
+          lastCount = currentCount;
+          lastLogTime = now;
+        }
+        return destination;
+      } catch (Exception e) {
+        INSTANCE_LOGGER.warn("Caught exception while consuming events", e);
+        _serverMetrics.addMeteredTableValue(_tableAndStreamName, ServerMeter.REALTIME_CONSUMPTION_EXCEPTIONS, 1L);
+        _serverMetrics.addMeteredGlobalValue(ServerMeter.REALTIME_CONSUMPTION_EXCEPTIONS, 1L);
+        throw e;
+      }
+    }
+    return null;
+  }
+
+  private void updateOffsets(int partition, long offset) {
+    consumerOffsets.put(partition, offset);
+  }
+
+  @Override
+  public void commit() {
+    consumer.commitSync(getOffsetsMap());
+    consumerOffsets.clear();
+    _serverMetrics.addMeteredTableValue(_tableAndStreamName, ServerMeter.REALTIME_OFFSET_COMMITS, 1L);
+    _serverMetrics.addMeteredGlobalValue(ServerMeter.REALTIME_OFFSET_COMMITS, 1L);
+  }
+
+  private Map<TopicPartition, OffsetAndMetadata> getOffsetsMap() {
+    Map<TopicPartition, OffsetAndMetadata> offsetsMap = new HashMap<>();
+    for (Integer partition : consumerOffsets.keySet()) {
+      offsetsMap.put(new TopicPartition(_streamConfig.getTopicName(), partition),
+          new OffsetAndMetadata(consumerOffsets.get(partition)));
+    }
+    return offsetsMap;
+  }
+
+  @Override
+  public void shutdown()
+      throws Exception {
+    if (consumer != null) {
+      consumer = null;
+      Kafka2ConsumerManager.releaseKafkaConsumer(consumer);
+    }
+  }
+}
diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaAvroMessageDecoder.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaAvroMessageDecoder.java
new file mode 100644
index 0000000..5e09faf
--- /dev/null
+++ b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaAvroMessageDecoder.java
@@ -0,0 +1,290 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.kafka2;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLConnection;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.commons.lang.StringUtils;
+import org.apache.pinot.common.data.Schema;
+import org.apache.pinot.common.utils.retry.RetryPolicies;
+import org.apache.pinot.core.data.GenericRow;
+import org.apache.pinot.core.realtime.stream.AvroRecordToPinotRowGenerator;
+import org.apache.pinot.core.realtime.stream.StreamMessageDecoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@NotThreadSafe
+public class KafkaAvroMessageDecoder implements StreamMessageDecoder<byte[]> {
+  private static final Logger LOGGER = LoggerFactory.getLogger(KafkaAvroMessageDecoder.class);
+
+  private static final String SCHEMA_REGISTRY_REST_URL = "schema.registry.rest.url";
+  private static final String SCHEMA_REGISTRY_SCHEMA_NAME = "schema.registry.schema.name";
+  private org.apache.avro.Schema defaultAvroSchema;
+  private MD5AvroSchemaMap md5ToAvroSchemaMap;
+
+  // A global cache for schemas across all threads.
+  private static final Map<String, org.apache.avro.Schema> globalSchemaCache = new HashMap<>();
+  // Suffix for getting the latest schema
+  private static final String LATEST = "-latest";
+
+  // Reusable byte[] to read MD5 from payload. This is OK as this class is used only by a single thread.
+  private final byte[] reusableMD5Bytes = new byte[SCHEMA_HASH_LENGTH];
+
+  private DecoderFactory decoderFactory;
+  private AvroRecordToPinotRowGenerator avroRecordConvetrer;
+
+  private static final int MAGIC_BYTE_LENGTH = 1;
+  private static final int SCHEMA_HASH_LENGTH = 16;
+  private static final int HEADER_LENGTH = MAGIC_BYTE_LENGTH + SCHEMA_HASH_LENGTH;
+
+  private static final int SCHEMA_HASH_START_OFFSET = MAGIC_BYTE_LENGTH;
+
+  private static final int MAXIMUM_SCHEMA_FETCH_RETRY_COUNT = 5;
+  private static final int MINIMUM_SCHEMA_FETCH_RETRY_TIME_MILLIS = 500;
+  private static final float SCHEMA_FETCH_RETRY_EXPONENTIAL_BACKOFF_FACTOR = 2.0f;
+
+  private String[] schemaRegistryUrls;
+
+  @Override
+  public void init(Map<String, String> props, Schema indexingSchema, String topicName)
+      throws Exception {
+    schemaRegistryUrls = parseSchemaRegistryUrls(props.get(SCHEMA_REGISTRY_REST_URL));
+
+    for (String schemaRegistryUrl : schemaRegistryUrls) {
+      StringUtils.chomp(schemaRegistryUrl, "/");
+    }
+
+    String avroSchemaName = topicName;
+    if (props.containsKey(SCHEMA_REGISTRY_SCHEMA_NAME) && props.get(SCHEMA_REGISTRY_SCHEMA_NAME) != null && !props
+        .get(SCHEMA_REGISTRY_SCHEMA_NAME).isEmpty()) {
+      avroSchemaName = props.get(SCHEMA_REGISTRY_SCHEMA_NAME);
+    }
+    // With the logic below, we may not set defaultAvroSchema to be the latest one everytime.
+    // The schema is fetched once when the machine starts. Until the next restart. the latest schema is
+    // not fetched.
+    // But then we always pay attention to the exact MD5 hash and attempt to fetch the schema for that particular hash
+    // before decoding an incoming kafka event. We use defaultAvroSchema only if the fetch for the particular MD5 fails,
+    // but then we will retry that fetch on every event in case of failure.
+    synchronized (globalSchemaCache) {
+      final String hashKey = avroSchemaName + LATEST;
+      defaultAvroSchema = globalSchemaCache.get(hashKey);
+      if (defaultAvroSchema == null) {
+        defaultAvroSchema = fetchSchema("/latest_with_type=" + avroSchemaName);
+        globalSchemaCache.put(hashKey, defaultAvroSchema);
+        LOGGER.info("Populated schema cache with schema for {}", hashKey);
+      }
+    }
+    this.avroRecordConvetrer = new AvroRecordToPinotRowGenerator(indexingSchema);
+    this.decoderFactory = new DecoderFactory();
+    md5ToAvroSchemaMap = new MD5AvroSchemaMap();
+  }
+
+  @Override
+  public GenericRow decode(byte[] payload, GenericRow destination) {
+    return decode(payload, 0, payload.length, destination);
+  }
+
+  @Override
+  public GenericRow decode(byte[] payload, int offset, int length, GenericRow destination) {
+    if (payload == null || payload.length == 0 || length == 0) {
+      return null;
+    }
+
+    System.arraycopy(payload, SCHEMA_HASH_START_OFFSET + offset, reusableMD5Bytes, 0, SCHEMA_HASH_LENGTH);
+
+    boolean schemaUpdateFailed = false;
+    org.apache.avro.Schema schema = md5ToAvroSchemaMap.getSchema(reusableMD5Bytes);
+    if (schema == null) {
+      // We will get here for the first row consumed in the segment, and every row that has a schema ID that is
+      // not yet in md5ToAvroSchemaMap.
+      synchronized (globalSchemaCache) {
+        final String hashKey = hex(reusableMD5Bytes);
+        schema = globalSchemaCache.get(hashKey);
+        if (schema == null) {
+          // We will get here only if no partition of the table has populated the global schema cache.
+          // In that case, one of the consumers will fetch the schema and populate the cache, and the others
+          // should find it in the cache and po
+          final String schemaUri = "/id=" + hex(reusableMD5Bytes);
+          try {
+            schema = fetchSchema(schemaUri);
+            globalSchemaCache.put(hashKey, schema);
+            md5ToAvroSchemaMap.addSchema(reusableMD5Bytes, schema);
+          } catch (Exception e) {
+            schema = defaultAvroSchema;
+            LOGGER
+                .error("Error fetching schema using url {}. Attempting to continue with previous schema", schemaUri, e);
+            schemaUpdateFailed = true;
+          }
+        } else {
+          LOGGER.info("Found schema for {} in cache", hashKey);
+          md5ToAvroSchemaMap.addSchema(reusableMD5Bytes, schema);
+        }
+      }
+    }
+    DatumReader<Record> reader = new GenericDatumReader<Record>(schema);
+    try {
+      Record avroRecord = reader.read(null,
+          decoderFactory.createBinaryDecoder(payload, HEADER_LENGTH + offset, length - HEADER_LENGTH, null));
+      return avroRecordConvetrer.transform(avroRecord, destination);
+    } catch (IOException e) {
+      LOGGER.error("Caught exception while reading message using schema {}{}",
+          (schema == null ? "null" : schema.getName()),
+          (schemaUpdateFailed ? "(possibly due to schema update failure)" : ""), e);
+      return null;
+    }
+  }
+
+  private String hex(byte[] bytes) {
+    StringBuilder builder = new StringBuilder(2 * bytes.length);
+    for (byte aByte : bytes) {
+      String hexString = Integer.toHexString(0xFF & aByte);
+      if (hexString.length() < 2) {
+        hexString = "0" + hexString;
+      }
+      builder.append(hexString);
+    }
+    return builder.toString();
+  }
+
+  private static class SchemaFetcher implements Callable<Boolean> {
+    private org.apache.avro.Schema _schema;
+    private URL url;
+    private boolean _isSuccessful = false;
+
+    SchemaFetcher(URL url) {
+      this.url = url;
+    }
+
+    @Override
+    public Boolean call()
+        throws Exception {
+      try {
+        URLConnection conn = url.openConnection();
+        conn.setConnectTimeout(15000);
+        conn.setReadTimeout(15000);
+        LOGGER.info("Fetching schema using url {}", url.toString());
+
+        StringBuilder queryResp = new StringBuilder();
+        try (BufferedReader reader = new BufferedReader(
+            new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8))) {
+          for (String line = reader.readLine(); line != null; line = reader.readLine()) {
+            queryResp.append(line);
+          }
+        }
+
+        _schema = org.apache.avro.Schema.parse(queryResp.toString());
+
+        LOGGER.info("Schema fetch succeeded on url {}", url.toString());
+        return Boolean.TRUE;
+      } catch (Exception e) {
+        LOGGER.warn("Caught exception while fetching schema", e);
+        return Boolean.FALSE;
+      }
+    }
+
+    public org.apache.avro.Schema getSchema() {
+      return _schema;
+    }
+  }
+
+  private org.apache.avro.Schema fetchSchema(String reference)
+      throws Exception {
+    SchemaFetcher schemaFetcher = new SchemaFetcher(makeRandomUrl(reference));
+    RetryPolicies
+        .exponentialBackoffRetryPolicy(MAXIMUM_SCHEMA_FETCH_RETRY_COUNT, MINIMUM_SCHEMA_FETCH_RETRY_TIME_MILLIS,
+            SCHEMA_FETCH_RETRY_EXPONENTIAL_BACKOFF_FACTOR).attempt(schemaFetcher);
+    return schemaFetcher.getSchema();
+  }
+
+  /**
+   * Private class for encapsulating MD5 to Avro schema mapping.
+   * <ul>
+   *   <li> Maintains two lists, one for md5s and another for schema. </li>
+   *   <li> MD5 at index i in the MD5 list, corresponds to Schema at index i in the schema list. </li>
+   * </ul>
+   */
+  private static class MD5AvroSchemaMap {
+    private List<byte[]> md5s;
+    private List<org.apache.avro.Schema> schemas;
+
+    /**
+     * Constructor for the class.
+     */
+    private MD5AvroSchemaMap() {
+      md5s = new ArrayList<>();
+      schemas = new ArrayList<>();
+    }
+
+    /**
+     * Returns the Avro schema corresponding to the given MD5.
+     *
+     * @param md5ForSchema MD5 for which to get the avro schema.
+     * @return Avro schema for the given MD5.
+     */
+    private org.apache.avro.Schema getSchema(byte[] md5ForSchema) {
+      for (int i = 0; i < md5s.size(); i++) {
+        if (Arrays.equals(md5s.get(i), md5ForSchema)) {
+          return schemas.get(i);
+        }
+      }
+      return null;
+    }
+
+    /**
+     * Adds mapping between MD5 and Avro schema.
+     * Caller to ensure that addSchema is called only once per MD5-Schema pair.
+     *
+     * @param md5 MD5 for the Schema
+     * @param schema Avro Schema
+     */
+    private void addSchema(byte[] md5, org.apache.avro.Schema schema) {
+      md5s.add(Arrays.copyOf(md5, md5.length));
+      schemas.add(schema);
+    }
+  }
+
+  protected URL makeRandomUrl(String reference)
+      throws MalformedURLException {
+    Random rand = new Random();
+    int randomInteger = rand.nextInt(schemaRegistryUrls.length);
+    return new URL(schemaRegistryUrls[randomInteger] + reference);
+  }
+
+  protected String[] parseSchemaRegistryUrls(String schemaConfig) {
+    return schemaConfig.split(",");
+  }
+}
diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaConnectionHandler.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaConnectionHandler.java
deleted file mode 100644
index 802062f..0000000
--- a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaConnectionHandler.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.realtime.impl.kafka2;
-
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.serialization.BytesDeserializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.pinot.core.realtime.stream.StreamConfig;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Properties;
-
-public abstract class KafkaConnectionHandler {
-
-    protected final KafkaPartitionLevelStreamConfig _config;
-    protected final int _partition;
-    protected final String _topic;
-    protected final Consumer<String, byte[]> _consumer;
-    protected final TopicPartition _topicPartition;
-
-    public KafkaConnectionHandler(StreamConfig streamConfig, int partition) {
-        _config = new KafkaPartitionLevelStreamConfig(streamConfig);
-        _partition = partition;
-        _topic = _config.getKafkaTopicName();
-        Properties consumerProp = new Properties();
-        consumerProp.putAll(streamConfig.getStreamConfigsMap());
-        consumerProp.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, _config.getBootstrapHosts());
-        consumerProp.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
-        consumerProp.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class.getName());
-        _consumer = new KafkaConsumer<>(consumerProp);
-        _topicPartition = new TopicPartition(_topic, _partition);
-        _consumer.assign(Collections.singletonList(_topicPartition));
-
-    }
-
-    public void close() throws IOException {
-        _consumer.close();
-    }
-
-
-}
diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaJSONMessageDecoder.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaJSONMessageDecoder.java
new file mode 100644
index 0000000..8d1fd96
--- /dev/null
+++ b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaJSONMessageDecoder.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.kafka2;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.util.Arrays;
+import java.util.Map;
+import org.apache.pinot.common.data.FieldSpec;
+import org.apache.pinot.common.data.Schema;
+import org.apache.pinot.common.utils.JsonUtils;
+import org.apache.pinot.core.data.GenericRow;
+import org.apache.pinot.core.realtime.stream.StreamMessageDecoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class KafkaJSONMessageDecoder implements StreamMessageDecoder<byte[]> {
+  private static final Logger LOGGER = LoggerFactory.getLogger(KafkaJSONMessageDecoder.class);
+
+  private Schema schema;
+
+  @Override
+  public void init(Map<String, String> props, Schema indexingSchema, String topicName)
+      throws Exception {
+    this.schema = indexingSchema;
+  }
+
+  @Override
+  public GenericRow decode(byte[] payload, GenericRow destination) {
+    try {
+      JsonNode message = JsonUtils.bytesToJsonNode(payload);
+      for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
+        String column = fieldSpec.getName();
+        destination.putField(column, JsonUtils.extractValue(message.get(column), fieldSpec));
+      }
+      return destination;
+    } catch (Exception e) {
+      LOGGER.error("Caught exception while decoding row, discarding row.", e);
+      return null;
+    }
+  }
+
+  @Override
+  public GenericRow decode(byte[] payload, int offset, int length, GenericRow destination) {
+    return decode(Arrays.copyOfRange(payload, offset, offset + length), destination);
+  }
+}
diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaMessageBatch.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaMessageBatch.java
deleted file mode 100644
index 22aa683..0000000
--- a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaMessageBatch.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.realtime.impl.kafka2;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.pinot.core.realtime.stream.MessageBatch;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class KafkaMessageBatch implements MessageBatch<byte[]> {
-
-    private List<MessageAndOffset> messageList = new ArrayList<>();
-
-    public KafkaMessageBatch(Iterable<ConsumerRecord<String, byte[]>> iterable) {
-        for (ConsumerRecord<String, byte[]> record : iterable) {
-            messageList.add(new MessageAndOffset(record.value(), record.offset()));
-        }
-    }
-
-    @Override
-    public int getMessageCount() {
-        return messageList.size();
-    }
-
-    @Override
-    public byte[] getMessageAtIndex(int index) {
-        return messageList.get(index).getMessage().array();
-    }
-
-    @Override
-    public int getMessageOffsetAtIndex(int index) {
-        return messageList.get(index).getMessage().arrayOffset();
-    }
-
-    @Override
-    public int getMessageLengthAtIndex(int index) {
-        return messageList.get(index).getMessage().array().length;
-    }
-
-    @Override
-    public long getNextStreamMessageOffsetAtIndex(int index) {
-        return messageList.get(index).getNextOffset();
-    }
-
-    public Iterable<MessageAndOffset> iterable() {
-        return messageList;
-    }
-}
diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionConsumer.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionConsumer.java
deleted file mode 100644
index de3295d..0000000
--- a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionConsumer.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.realtime.impl.kafka2;
-
-import org.apache.kafka.clients.consumer.*;
-
-import org.apache.pinot.core.realtime.stream.MessageBatch;
-import org.apache.pinot.core.realtime.stream.PartitionLevelConsumer;
-import org.apache.pinot.core.realtime.stream.StreamConfig;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.TimeoutException;
-
-public class KafkaPartitionConsumer extends KafkaConnectionHandler implements PartitionLevelConsumer {
-
-
-    public KafkaPartitionConsumer(StreamConfig streamConfig, int partition) {
-        super(streamConfig, partition);
-    }
-
-    @Override
-    public MessageBatch fetchMessages(long startOffset, long endOffset, int timeoutMillis) throws TimeoutException {
-        _consumer.seek(_topicPartition, startOffset);
-
-        ConsumerRecords<String, byte[]> consumerRecords = _consumer.poll(null);
-        List<ConsumerRecord<String, byte[]>> records = consumerRecords.records(_topicPartition);
-        return new KafkaMessageBatch(records);
-    }
-
-    @Override
-    public void close() throws IOException {
-        super.close();
-    }
-}
diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionLevelStreamConfig.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionLevelStreamConfig.java
deleted file mode 100644
index c154a38..0000000
--- a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionLevelStreamConfig.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.realtime.impl.kafka2;
-
-import com.google.common.base.Preconditions;
-import org.apache.commons.lang.StringUtils;
-import org.apache.pinot.common.utils.EqualityUtils;
-import org.apache.pinot.core.realtime.stream.StreamConfig;
-
-import java.util.Map;
-import java.util.Properties;
-
-public class KafkaPartitionLevelStreamConfig {
-
-    private final String _kafkaTopicName;
-    private final String _bootstrapHosts;
-    private final int _kafkaBufferSize;
-    private final int _kafkaSocketTimeout;
-    private final int _kafkaFetcherSizeBytes;
-    private final int _kafkaFetcherMinBytes;
-    private final Map<String, String> _streamConfigMap;
-
-    /**
-     * Builds a wrapper around {@link StreamConfig} to fetch kafka partition level consumer related configs
-     * @param streamConfig
-     */
-    public KafkaPartitionLevelStreamConfig(StreamConfig streamConfig) {
-        _streamConfigMap = streamConfig.getStreamConfigsMap();
-
-        _kafkaTopicName = streamConfig.getTopicName();
-
-        String llcBrokerListKey = KafkaStreamConfigProperties
-                .constructStreamProperty(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_BROKER_LIST);
-        String llcBufferKey = KafkaStreamConfigProperties
-                .constructStreamProperty(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE);
-        String llcTimeoutKey = KafkaStreamConfigProperties
-                .constructStreamProperty(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_SOCKET_TIMEOUT);
-        String fetcherSizeKey = KafkaStreamConfigProperties
-                .constructStreamProperty(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_FETCHER_SIZE_BYTES);
-        String fetcherMinBytesKey = KafkaStreamConfigProperties
-                .constructStreamProperty(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_FETCHER_MIN_BYTES);
-        _bootstrapHosts = _streamConfigMap.get(llcBrokerListKey);
-        _kafkaBufferSize = getIntConfigWithDefault(_streamConfigMap, llcBufferKey,
-                KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE_DEFAULT);
-        _kafkaSocketTimeout = getIntConfigWithDefault(_streamConfigMap, llcTimeoutKey,
-                KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_SOCKET_TIMEOUT_DEFAULT);
-        _kafkaFetcherSizeBytes = getIntConfigWithDefault(_streamConfigMap, fetcherSizeKey, _kafkaBufferSize);
-        _kafkaFetcherMinBytes = getIntConfigWithDefault(_streamConfigMap, fetcherMinBytesKey,
-                KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_FETCHER_MIN_BYTES_DEFAULT);
-        Preconditions.checkNotNull(_bootstrapHosts,
-                "Must specify kafka brokers list " + llcBrokerListKey + " in case of low level kafka consumer");
-    }
-
-    public String getKafkaTopicName() {
-        return _kafkaTopicName;
-    }
-
-    public String getBootstrapHosts() {
-        return _bootstrapHosts;
-    }
-
-    public int getKafkaBufferSize() {
-        return _kafkaBufferSize;
-    }
-
-    public int getKafkaSocketTimeout() {
-        return _kafkaSocketTimeout;
-    }
-
-    public int getKafkaFetcherSizeBytes() {
-        return _kafkaFetcherSizeBytes;
-    }
-
-    public int getKafkaFetcherMinBytes() {
-        return _kafkaFetcherMinBytes;
-    }
-
-    private int getIntConfigWithDefault(Map<String, String> configMap, String key, int defaultValue) {
-        String stringValue = configMap.get(key);
-        try {
-            if (StringUtils.isNotEmpty(stringValue)) {
-                return Integer.parseInt(stringValue);
-            }
-            return defaultValue;
-        } catch (NumberFormatException ex) {
-            return defaultValue;
-        }
-    }
-
-    @Override
-    public String toString() {
-        return "KafkaLowLevelStreamConfig{" + "_kafkaTopicName='" + _kafkaTopicName + '\'' + ", _bootstrapHosts='"
-                + _bootstrapHosts + '\'' + ", _kafkaBufferSize='" + _kafkaBufferSize + '\'' + ", _kafkaSocketTimeout='"
-                + _kafkaSocketTimeout + '\'' + ", _kafkaFetcherSizeBytes='" + _kafkaFetcherSizeBytes + '\'' + ", _kafkaFetcherMinBytes='"
-                + _kafkaFetcherMinBytes + '\'' + '}';
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (EqualityUtils.isSameReference(this, o)) {
-            return true;
-        }
-
-        if (EqualityUtils.isNullOrNotSameClass(this, o)) {
-            return false;
-        }
-
-        KafkaPartitionLevelStreamConfig that = (KafkaPartitionLevelStreamConfig) o;
-
-        return EqualityUtils.isEqual(_kafkaTopicName, that._kafkaTopicName) && EqualityUtils
-                .isEqual(_bootstrapHosts, that._bootstrapHosts) && EqualityUtils
-                .isEqual(_kafkaBufferSize, that._kafkaBufferSize) && EqualityUtils
-                .isEqual(_kafkaSocketTimeout, that._kafkaSocketTimeout) && EqualityUtils
-                .isEqual(_kafkaFetcherSizeBytes, that._kafkaFetcherSizeBytes) && EqualityUtils
-                .isEqual(_kafkaFetcherMinBytes, that._kafkaFetcherMinBytes);
-    }
-
-    @Override
-    public int hashCode() {
-        int result = EqualityUtils.hashCodeOf(_kafkaTopicName);
-        result = EqualityUtils.hashCodeOf(result, _bootstrapHosts);
-        result = EqualityUtils.hashCodeOf(result, _kafkaBufferSize);
-        result = EqualityUtils.hashCodeOf(result, _kafkaSocketTimeout);
-        result = EqualityUtils.hashCodeOf(result, _kafkaFetcherSizeBytes);
-        result = EqualityUtils.hashCodeOf(result, _kafkaFetcherMinBytes);
-        return result;
-    }
-}
diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaStreamMetadataProvider.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaStreamMetadataProvider.java
deleted file mode 100644
index 3871d85..0000000
--- a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaStreamMetadataProvider.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.realtime.impl.kafka2;
-
-import com.google.common.base.Preconditions;
-import org.apache.kafka.clients.admin.AdminClient;
-import org.apache.kafka.clients.admin.DescribeTopicsResult;
-import org.apache.kafka.clients.admin.TopicDescription;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.KafkaFuture;
-import org.apache.pinot.core.realtime.stream.OffsetCriteria;
-import org.apache.pinot.core.realtime.stream.StreamConfig;
-import org.apache.pinot.core.realtime.stream.StreamMetadataProvider;
-
-import javax.annotation.Nonnull;
-import java.io.IOException;
-import java.time.Duration;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeoutException;
-
-public class KafkaStreamMetadataProvider extends KafkaConnectionHandler implements StreamMetadataProvider {
-
-    private AdminClient _adminClient;
-
-    public KafkaStreamMetadataProvider(StreamConfig streamConfig, int partition) {
-        super(streamConfig, partition);
-        final Properties props = new Properties();
-        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, _config.getBootstrapHosts());
-        _adminClient = AdminClient.create(props);
-    }
-
-    @Override
-    public int fetchPartitionCount(long timeoutMillis) {
-        DescribeTopicsResult result = _adminClient.describeTopics(Collections.singletonList(_config.getKafkaTopicName()));
-        Map<String, KafkaFuture<TopicDescription>> values = result.values();
-        KafkaFuture<TopicDescription> topicDescription = values.get(_config.getKafkaTopicName());
-        try {
-            return topicDescription.get().partitions().size();
-        } catch (InterruptedException | ExecutionException e) {
-            throw new RuntimeException("");
-        }
-    }
-
-    @Override
-    public long fetchPartitionOffset(@Nonnull OffsetCriteria offsetCriteria, long timeoutMillis) throws TimeoutException {
-
-        Preconditions.checkNotNull(offsetCriteria);
-        if (offsetCriteria.isLargest()) {
-            return _consumer.endOffsets(Collections.singletonList(_topicPartition), Duration.ofMillis(timeoutMillis)).get(_topicPartition);
-        } else if (offsetCriteria.isSmallest()) {
-            return _consumer.beginningOffsets(Collections.singletonList(_topicPartition), Duration.ofMillis(timeoutMillis)).get(_topicPartition);
-        } else {
-            throw new IllegalArgumentException("Unknown initial offset value " + offsetCriteria.toString());
-        }
-
-    }
-
-    @Override
-    public void close() throws IOException {
-        super.close();
-    }
-}
diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/MessageAndOffset.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/MessageAndOffset.java
index 0dea267..b5bdaba 100644
--- a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/MessageAndOffset.java
+++ b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/MessageAndOffset.java
@@ -20,30 +20,34 @@ package org.apache.pinot.core.realtime.impl.kafka2;
 
 import java.nio.ByteBuffer;
 
+
 public class MessageAndOffset {
 
-    private ByteBuffer _message;
-    private long _offset;
+  private ByteBuffer _message;
+  private long _offset;
+
+  public MessageAndOffset(byte[] message, long offset) {
+    this(ByteBuffer.wrap(message), offset);
+  }
 
-    public MessageAndOffset(byte[] message, long offset) {
-        _message = ByteBuffer.wrap(message);
-        _offset = offset;
-    }
+  public MessageAndOffset(ByteBuffer message, long offset) {
+    _message = message;
+    _offset = offset;
+  }
 
-    public MessageAndOffset(ByteBuffer message, long offset) {
-        _message = message;
-        _offset = offset;
-    }
+  public ByteBuffer getMessage() {
+    return _message;
+  }
 
-    public ByteBuffer getMessage() {
-        return _message;
-    }
+  public long getOffset() {
+    return _offset;
+  }
 
-    public long getOffset() {
-        return _offset;
-    }
+  public long getNextOffset() {
+    return getOffset() + 1;
+  }
 
-    public long getNextOffset() {
-        return _offset + 1;
-    }
+  public int payloadSize() {
+    return getMessage().array().length;
+  }
 }
diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/test/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionLevelConsumerTest.java b/pinot-connectors/pinot-connector-kafka-2.0/src/test/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionLevelConsumerTest.java
new file mode 100644
index 0000000..cc28127
--- /dev/null
+++ b/pinot-connectors/pinot-connector-kafka-2.0/src/test/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionLevelConsumerTest.java
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.kafka2;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.pinot.core.realtime.impl.kafka2.utils.MiniKafkaCluster;
+import org.apache.pinot.core.realtime.stream.OffsetCriteria;
+import org.apache.pinot.core.realtime.stream.StreamConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+/**
+ * Tests for the KafkaPartitionLevelConsumer.
+ */
+public class KafkaPartitionLevelConsumerTest {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPartitionLevelConsumerTest.class);
+  private static final long STABILIZE_SLEEP_DELAYS = 3000;
+  private static final String TEST_TOPIC_1 = "foo";
+  private static final String TEST_TOPIC_2 = "bar";
+  private static final int NUM_MSG_PRODUCED = 1000;
+
+  private static MiniKafkaCluster kafkaCluster;
+  private static String brokerAddress;
+
+  @BeforeClass
+  public static void setup()
+      throws Exception {
+    kafkaCluster = new MiniKafkaCluster.Builder().newServer("0").build();
+    LOGGER.info("Trying to start MiniKafkaCluster");
+    kafkaCluster.start();
+    brokerAddress = getKakfaBroker();
+    kafkaCluster.createTopic(TEST_TOPIC_1, 1, 1);
+    kafkaCluster.createTopic(TEST_TOPIC_2, 2, 1);
+    Thread.sleep(STABILIZE_SLEEP_DELAYS);
+    produceMsgToKafka();
+    Thread.sleep(STABILIZE_SLEEP_DELAYS);
+  }
+
+  private static void produceMsgToKafka() {
+    Properties props = new Properties();
+    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getKakfaBroker());
+    props.put(ProducerConfig.CLIENT_ID_CONFIG, "clientId");
+    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+    KafkaProducer p = new KafkaProducer<>(props);
+    for (int i = 0; i < NUM_MSG_PRODUCED; i++) {
+      p.send(new ProducerRecord(TEST_TOPIC_1, "sample_msg_" + i));
+      p.send(new ProducerRecord(TEST_TOPIC_2, "sample_msg_" + i));
+    }
+  }
+
+  private static String getKakfaBroker() {
+    return "127.0.0.1:" + kafkaCluster.getKafkaServerPort(0);
+  }
+
+  @AfterClass
+  public static void shutDown()
+      throws Exception {
+    kafkaCluster.deleteTopic(TEST_TOPIC_1);
+    kafkaCluster.deleteTopic(TEST_TOPIC_2);
+    kafkaCluster.close();
+  }
+
+  @Test
+  public void testBuildConsumer()
+      throws Exception {
+    String streamType = "kafka";
+    String streamKafkaTopicName = "theTopic";
+    String streamKafkaBrokerList = "127.0.0.1:" + kafkaCluster.getKafkaServerPort(0);
+    String streamKafkaConsumerType = "simple";
+    String clientId = "clientId";
+
+    Map<String, String> streamConfigMap = new HashMap<>();
+    streamConfigMap.put("streamType", streamType);
+    streamConfigMap.put("stream.kafka.topic.name", streamKafkaTopicName);
+    streamConfigMap.put("stream.kafka.broker.list", streamKafkaBrokerList);
+    streamConfigMap.put("stream.kafka.consumer.type", streamKafkaConsumerType);
+    streamConfigMap.put("stream.kafka.consumer.factory.class.name", Kafka2ConsumerFactory.class.getName());
+    streamConfigMap.put("stream.kafka.decoder.class.name", "decoderClass");
+    streamConfigMap.put("stream.kafka.fetcher.size", "10000");
+    streamConfigMap.put("stream.kafka.fetcher.minBytes", "20000");
+    StreamConfig streamConfig = new StreamConfig(streamConfigMap);
+
+    Kafka2PartitionLevelStreamMetadataProvider streamMetadataProvider =
+        new Kafka2PartitionLevelStreamMetadataProvider(clientId, streamConfig);
+
+    // test default value
+    Kafka2PartitionLevelPartitionLevelConsumer kafkaSimpleStreamConsumer =
+        new Kafka2PartitionLevelPartitionLevelConsumer(clientId, streamConfig, 0);
+    kafkaSimpleStreamConsumer.fetchMessages(12345L, 23456L, 10000);
+
+    Assert.assertEquals(Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE_DEFAULT,
+        kafkaSimpleStreamConsumer.getKafka2PartitionLevelStreamConfig().getKafkaBufferSize());
+    Assert.assertEquals(Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_SOCKET_TIMEOUT_DEFAULT,
+        kafkaSimpleStreamConsumer.getKafka2PartitionLevelStreamConfig().getKafkaSocketTimeout());
+
+    // test parsing values
+    Assert.assertEquals(10000,
+        kafkaSimpleStreamConsumer.getKafka2PartitionLevelStreamConfig().getKafkaFetcherSizeBytes());
+    Assert
+        .assertEquals(20000, kafkaSimpleStreamConsumer.getKafka2PartitionLevelStreamConfig().getKafkaFetcherMinBytes());
+
+    // test user defined values
+    streamConfigMap.put("stream.kafka.buffer.size", "100");
+    streamConfigMap.put("stream.kafka.socket.timeout", "1000");
+    streamConfig = new StreamConfig(streamConfigMap);
+    kafkaSimpleStreamConsumer = new Kafka2PartitionLevelPartitionLevelConsumer(clientId, streamConfig, 0);
+    kafkaSimpleStreamConsumer.fetchMessages(12345L, 23456L, 10000);
+    Assert.assertEquals(100, kafkaSimpleStreamConsumer.getKafka2PartitionLevelStreamConfig().getKafkaBufferSize());
+    Assert.assertEquals(1000, kafkaSimpleStreamConsumer.getKafka2PartitionLevelStreamConfig().getKafkaSocketTimeout());
+  }
+
+  @Test
+  public void testGetPartitionCount() {
+    String streamType = "kafka";
+    String streamKafkaBrokerList = "127.0.0.1:" + kafkaCluster.getKafkaServerPort(0);
+    String streamKafkaConsumerType = "simple";
+    String clientId = "clientId";
+
+    Map<String, String> streamConfigMap = new HashMap<>();
+    streamConfigMap.put("streamType", streamType);
+    streamConfigMap.put("stream.kafka.topic.name", TEST_TOPIC_1);
+    streamConfigMap.put("stream.kafka.broker.list", streamKafkaBrokerList);
+    streamConfigMap.put("stream.kafka.consumer.type", streamKafkaConsumerType);
+    streamConfigMap.put("stream.kafka.consumer.factory.class.name", Kafka2ConsumerFactory.class.getName());
+    streamConfigMap.put("stream.kafka.decoder.class.name", "decoderClass");
+    StreamConfig streamConfig = new StreamConfig(streamConfigMap);
+
+    Kafka2PartitionLevelStreamMetadataProvider streamMetadataProvider =
+        new Kafka2PartitionLevelStreamMetadataProvider(clientId, streamConfig);
+    Assert.assertEquals(streamMetadataProvider.fetchPartitionCount(10000L), 1);
+
+    streamConfigMap = new HashMap<>();
+    streamConfigMap.put("streamType", streamType);
+    streamConfigMap.put("stream.kafka.topic.name", TEST_TOPIC_2);
+    streamConfigMap.put("stream.kafka.broker.list", streamKafkaBrokerList);
+    streamConfigMap.put("stream.kafka.consumer.type", streamKafkaConsumerType);
+    streamConfigMap.put("stream.kafka.consumer.factory.class.name", Kafka2ConsumerFactory.class.getName());
+    streamConfigMap.put("stream.kafka.decoder.class.name", "decoderClass");
+    streamConfig = new StreamConfig(streamConfigMap);
+
+    streamMetadataProvider = new Kafka2PartitionLevelStreamMetadataProvider(clientId, streamConfig);
+    Assert.assertEquals(streamMetadataProvider.fetchPartitionCount(10000L), 2);
+  }
+
+  @Test
+  public void testFetchMessages()
+      throws Exception {
+    String streamType = "kafka";
+    String streamKafkaTopicName = "theTopic";
+    String streamKafkaBrokerList = "127.0.0.1:" + kafkaCluster.getKafkaServerPort(0);
+    String streamKafkaConsumerType = "simple";
+    String clientId = "clientId";
+
+    Map<String, String> streamConfigMap = new HashMap<>();
+    streamConfigMap.put("streamType", streamType);
+    streamConfigMap.put("stream.kafka.topic.name", streamKafkaTopicName);
+    streamConfigMap.put("stream.kafka.broker.list", streamKafkaBrokerList);
+    streamConfigMap.put("stream.kafka.consumer.type", streamKafkaConsumerType);
+    streamConfigMap.put("stream.kafka.consumer.factory.class.name", Kafka2ConsumerFactory.class.getName());
+    streamConfigMap.put("stream.kafka.decoder.class.name", "decoderClass");
+    StreamConfig streamConfig = new StreamConfig(streamConfigMap);
+
+    int partition = 0;
+    Kafka2PartitionLevelPartitionLevelConsumer kafkaSimpleStreamConsumer =
+        new Kafka2PartitionLevelPartitionLevelConsumer(clientId, streamConfig, partition);
+    kafkaSimpleStreamConsumer.fetchMessages(12345L, 23456L, 10000);
+  }
+
+  @Test
+  public void testFetchOffsets()
+      throws Exception {
+    testFetchOffsets(TEST_TOPIC_1);
+    testFetchOffsets(TEST_TOPIC_2);
+  }
+
+  private void testFetchOffsets(String topic)
+      throws Exception {
+    String streamType = "kafka";
+    String streamKafkaBrokerList = "127.0.0.1:" + kafkaCluster.getKafkaServerPort(0);
+    String streamKafkaConsumerType = "simple";
+    String clientId = "clientId";
+
+    Map<String, String> streamConfigMap = new HashMap<>();
+    streamConfigMap.put("streamType", streamType);
+    streamConfigMap.put("stream.kafka.topic.name", topic);
+    streamConfigMap.put("stream.kafka.broker.list", streamKafkaBrokerList);
+    streamConfigMap.put("stream.kafka.consumer.type", streamKafkaConsumerType);
+    streamConfigMap.put("stream.kafka.consumer.factory.class.name", Kafka2ConsumerFactory.class.getName());
+    streamConfigMap.put("stream.kafka.decoder.class.name", "decoderClass");
+    StreamConfig streamConfig = new StreamConfig(streamConfigMap);
+
+    int numPartitions =
+        new Kafka2PartitionLevelStreamMetadataProvider(clientId, streamConfig).fetchPartitionCount(10000);
+    for (int partition = 0; partition < numPartitions; partition++) {
+      Kafka2PartitionLevelStreamMetadataProvider kafkaStreamMetadataProvider =
+          new Kafka2PartitionLevelStreamMetadataProvider(clientId, streamConfig, partition);
+      Assert.assertEquals(0, kafkaStreamMetadataProvider
+          .fetchPartitionOffset(new OffsetCriteria.OffsetCriteriaBuilder().withOffsetSmallest(), 10000));
+      Assert.assertEquals(NUM_MSG_PRODUCED / numPartitions, kafkaStreamMetadataProvider
+          .fetchPartitionOffset(new OffsetCriteria.OffsetCriteriaBuilder().withOffsetLargest(), 10000));
+    }
+  }
+}
diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/test/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionLevelStreamConfigTest.java b/pinot-connectors/pinot-connector-kafka-2.0/src/test/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionLevelStreamConfigTest.java
new file mode 100644
index 0000000..df02b9f
--- /dev/null
+++ b/pinot-connectors/pinot-connector-kafka-2.0/src/test/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionLevelStreamConfigTest.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.kafka2;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.core.realtime.stream.StreamConfig;
+import org.apache.pinot.core.realtime.stream.StreamConfigProperties;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class KafkaPartitionLevelStreamConfigTest {
+
+  private Kafka2PartitionLevelStreamConfig getStreamConfig(String topic, String bootstrapHosts, String buffer,
+      String socketTimeout) {
+    return getStreamConfig(topic, bootstrapHosts, buffer, socketTimeout, null, null);
+  }
+
+  private Kafka2PartitionLevelStreamConfig getStreamConfig(String topic, String bootstrapHosts, String buffer,
+      String socketTimeout, String fetcherSize, String fetcherMinBytes) {
+    Map<String, String> streamConfigMap = new HashMap<>();
+    String streamType = "kafka";
+    String consumerType = StreamConfig.ConsumerType.LOWLEVEL.toString();
+    String consumerFactoryClassName = Kafka2ConsumerFactory.class.getName();
+    String decoderClass = KafkaAvroMessageDecoder.class.getName();
+    streamConfigMap.put(StreamConfigProperties.STREAM_TYPE, streamType);
+    streamConfigMap
+        .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_TOPIC_NAME),
+            topic);
+    streamConfigMap
+        .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
+            consumerType);
+    streamConfigMap.put(StreamConfigProperties
+            .constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS),
+        consumerFactoryClassName);
+    streamConfigMap
+        .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_DECODER_CLASS),
+            decoderClass);
+    streamConfigMap.put("stream.kafka.broker.list", bootstrapHosts);
+    if (buffer != null) {
+      streamConfigMap.put("stream.kafka.buffer.size", buffer);
+    }
+    if (socketTimeout != null) {
+      streamConfigMap.put("stream.kafka.socket.timeout", socketTimeout);
+    }
+    if (fetcherSize != null) {
+      streamConfigMap.put("stream.kafka.fetcher.size", fetcherSize);
+    }
+    if (fetcherMinBytes != null) {
+      streamConfigMap.put("stream.kafka.fetcher.minBytes", fetcherMinBytes);
+    }
+    return new Kafka2PartitionLevelStreamConfig(new StreamConfig(streamConfigMap));
+  }
+
+  @Test
+  public void testGetKafkaTopicName() {
+    Kafka2PartitionLevelStreamConfig config = getStreamConfig("topic", "", "", "");
+    Assert.assertEquals("topic", config.getKafkaTopicName());
+  }
+
+  @Test
+  public void testGetBootstrapHosts() {
+    Kafka2PartitionLevelStreamConfig config = getStreamConfig("topic", "host1", "", "");
+    Assert.assertEquals("host1", config.getBootstrapHosts());
+  }
+
+  @Test
+  public void testGetKafkaBufferSize() {
+    // test default
+    Kafka2PartitionLevelStreamConfig config = getStreamConfig("topic", "host1", null, "");
+    Assert.assertEquals(Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE_DEFAULT,
+        config.getKafkaBufferSize());
+
+    config = getStreamConfig("topic", "host1", "", "");
+    Assert.assertEquals(Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE_DEFAULT,
+        config.getKafkaBufferSize());
+
+    config = getStreamConfig("topic", "host1", "bad value", "");
+    Assert.assertEquals(Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE_DEFAULT,
+        config.getKafkaBufferSize());
+
+    // correct config
+    config = getStreamConfig("topic", "host1", "100", "");
+    Assert.assertEquals(100, config.getKafkaBufferSize());
+  }
+
+  @Test
+  public void testGetKafkaSocketTimeout() {
+    // test default
+    Kafka2PartitionLevelStreamConfig config = getStreamConfig("topic", "host1", "", null);
+    Assert.assertEquals(Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_SOCKET_TIMEOUT_DEFAULT,
+        config.getKafkaSocketTimeout());
+
+    config = getStreamConfig("topic", "host1", "", "");
+    Assert.assertEquals(Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_SOCKET_TIMEOUT_DEFAULT,
+        config.getKafkaSocketTimeout());
+
+    config = getStreamConfig("topic", "host1", "", "bad value");
+    Assert.assertEquals(Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_SOCKET_TIMEOUT_DEFAULT,
+        config.getKafkaSocketTimeout());
+
+    // correct config
+    config = getStreamConfig("topic", "host1", "", "100");
+    Assert.assertEquals(100, config.getKafkaSocketTimeout());
+  }
+
+  @Test
+  public void testGetFetcherSize() {
+    // test default
+    Kafka2PartitionLevelStreamConfig config = getStreamConfig("topic", "host1", "", "", "", null);
+    Assert.assertEquals(Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE_DEFAULT,
+        config.getKafkaFetcherSizeBytes());
+
+    config = getStreamConfig("topic", "host1", "100", "", "", null);
+    Assert.assertEquals(100, config.getKafkaFetcherSizeBytes());
+
+    config = getStreamConfig("topic", "host1", "100", "", "bad value", null);
+    Assert.assertEquals(100, config.getKafkaFetcherSizeBytes());
+
+    // correct config
+    config = getStreamConfig("topic", "host1", "100", "", "200", null);
+    Assert.assertEquals(200, config.getKafkaFetcherSizeBytes());
+  }
+
+  @Test
+  public void testGetFetcherMinBytes() {
+    // test default
+    Kafka2PartitionLevelStreamConfig config = getStreamConfig("topic", "host1", "", "", "", null);
+    Assert.assertEquals(Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_FETCHER_MIN_BYTES_DEFAULT,
+        config.getKafkaFetcherMinBytes());
+
+    config = getStreamConfig("topic", "host1", "", "", "", "");
+    Assert.assertEquals(Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_FETCHER_MIN_BYTES_DEFAULT,
+        config.getKafkaFetcherMinBytes());
+
+    config = getStreamConfig("topic", "host1", "", "", "", "bad value");
+    Assert.assertEquals(Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_FETCHER_MIN_BYTES_DEFAULT,
+        config.getKafkaFetcherMinBytes());
+
+    // correct config
+    config = getStreamConfig("topic", "host1", "", "", "", "100");
+    Assert.assertEquals(100, config.getKafkaFetcherMinBytes());
+  }
+}
diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/test/java/org/apache/pinot/core/realtime/impl/kafka2/utils/EmbeddedZooKeeper.java b/pinot-connectors/pinot-connector-kafka-2.0/src/test/java/org/apache/pinot/core/realtime/impl/kafka2/utils/EmbeddedZooKeeper.java
new file mode 100644
index 0000000..47370aa
--- /dev/null
+++ b/pinot-connectors/pinot-connector-kafka-2.0/src/test/java/org/apache/pinot/core/realtime/impl/kafka2/utils/EmbeddedZooKeeper.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.kafka2.utils;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.file.Files;
+import org.apache.commons.io.FileUtils;
+import org.apache.zookeeper.server.NIOServerCnxnFactory;
+import org.apache.zookeeper.server.ZooKeeperServer;
+
+
+public class EmbeddedZooKeeper implements Closeable {
+
+  private static final int TICK_TIME = 500;
+  private final NIOServerCnxnFactory factory;
+  private final ZooKeeperServer zookeeper;
+  private final File tmpDir;
+  private final int port;
+
+  EmbeddedZooKeeper() throws IOException, InterruptedException {
+    this.tmpDir = Files.createTempDirectory(null).toFile();
+    this.factory = new NIOServerCnxnFactory();
+    this.zookeeper = new ZooKeeperServer(new File(tmpDir, "data"), new File(tmpDir, "log"),
+        TICK_TIME);
+    InetSocketAddress addr = new InetSocketAddress("127.0.0.1", 0);
+    factory.configure(addr, 0);
+    factory.startup(zookeeper);
+    this.port = zookeeper.getClientPort();
+  }
+
+  public int getPort() {
+    return port;
+  }
+
+  @Override
+  public void close() throws IOException {
+    zookeeper.shutdown();
+    factory.shutdown();
+    FileUtils.deleteDirectory(tmpDir);
+  }
+}
diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/test/java/org/apache/pinot/core/realtime/impl/kafka2/utils/MiniKafkaCluster.java b/pinot-connectors/pinot-connector-kafka-2.0/src/test/java/org/apache/pinot/core/realtime/impl/kafka2/utils/MiniKafkaCluster.java
new file mode 100644
index 0000000..3ec32fc
--- /dev/null
+++ b/pinot-connectors/pinot-connector-kafka-2.0/src/test/java/org/apache/pinot/core/realtime/impl/kafka2/utils/MiniKafkaCluster.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.kafka2.utils;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import org.apache.commons.io.FileUtils;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.DeleteTopicsResult;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.collection.Seq;
+
+
+public final class MiniKafkaCluster implements Closeable {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(MiniKafkaCluster.class);
+  private final EmbeddedZooKeeper zkServer;
+  private final ArrayList<KafkaServer> kafkaServer;
+  private final Path tempDir;
+  private final AdminClient adminClient;
+
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  private MiniKafkaCluster(List<String> brokerIds)
+      throws IOException, InterruptedException {
+    this.zkServer = new EmbeddedZooKeeper();
+    this.tempDir = Files.createTempDirectory(Paths.get(System.getProperty("java.io.tmpdir")), "mini-kafka-cluster");
+    this.kafkaServer = new ArrayList<>();
+    int port = 0;
+    for (String id : brokerIds) {
+      port = getAvailablePort();
+      KafkaConfig c = new KafkaConfig(createBrokerConfig(id, port));
+      Seq seq =
+          scala.collection.JavaConverters.collectionAsScalaIterableConverter(Collections.emptyList()).asScala().toSeq();
+      kafkaServer.add(new KafkaServer(c, Time.SYSTEM, Option.empty(), seq));
+    }
+    Properties props = new Properties();
+    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:" + port);
+    adminClient = AdminClient.create(props);
+  }
+
+  static int getAvailablePort() {
+    try {
+      try (ServerSocket socket = new ServerSocket(0)) {
+        return socket.getLocalPort();
+      }
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to find available port to use", e);
+    }
+  }
+
+  private Properties createBrokerConfig(String nodeId, int port)
+      throws IOException {
+    Properties props = new Properties();
+    props.put("broker.id", nodeId);
+    props.put("port", Integer.toString(port));
+    props.put("log.dir", Files.createTempDirectory(tempDir, "broker-").toAbsolutePath().toString());
+    props.put("zookeeper.connect", "127.0.0.1:" + zkServer.getPort());
+    props.put("replica.socket.timeout.ms", "1500");
+    props.put("controller.socket.timeout.ms", "1500");
+    props.put("controlled.shutdown.enable", "true");
+    props.put("delete.topic.enable", "true");
+    props.put("auto.create.topics.enable", "true");
+    props.put("offsets.topic.replication.factor", "1");
+    props.put("controlled.shutdown.retry.backoff.ms", "100");
+    props.put("log.cleaner.dedupe.buffer.size", "2097152");
+    return props;
+  }
+
+  public void start() {
+    for (KafkaServer s : kafkaServer) {
+      s.startup();
+    }
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    for (KafkaServer s : kafkaServer) {
+      s.shutdown();
+    }
+    this.zkServer.close();
+    FileUtils.deleteDirectory(tempDir.toFile());
+  }
+
+  public EmbeddedZooKeeper getZkServer() {
+    return zkServer;
+  }
+
+  public List<KafkaServer> getKafkaServer() {
+    return kafkaServer;
+  }
+
+  public int getKafkaServerPort(int index) {
+    return kafkaServer.get(index).socketServer()
+        .boundPort(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT));
+  }
+
+  public AdminClient getAdminClient() {
+    return adminClient;
+  }
+
+  public boolean createTopic(String topicName, int numPartitions, int replicationFactor) {
+    NewTopic newTopic = new NewTopic(topicName, numPartitions, (short) replicationFactor);
+    CreateTopicsResult createTopicsResult = this.adminClient.createTopics(Arrays.asList(newTopic));
+    try {
+      createTopicsResult.all().get();
+    } catch (InterruptedException | ExecutionException e) {
+      LOGGER.error("Failed to create Kafka topic: {}, Exception: {}", newTopic.toString(), e);
+      return false;
+    }
+    return true;
+  }
+
+  public boolean deleteTopic(String topicName) {
+    final DeleteTopicsResult deleteTopicsResult = this.adminClient.deleteTopics(Collections.singletonList(topicName));
+    try {
+      deleteTopicsResult.all().get();
+    } catch (InterruptedException | ExecutionException e) {
+      LOGGER.error("Failed to delete Kafka topic: {}, Exception: {}", topicName, e);
+      return false;
+    }
+    return true;
+  }
+
+  public static class Builder {
+
+    private List<String> brokerIds = new ArrayList<>();
+
+    public Builder newServer(String brokerId) {
+      brokerIds.add(brokerId);
+      return this;
+    }
+
+    public MiniKafkaCluster build()
+        throws IOException, InterruptedException {
+      return new MiniKafkaCluster(brokerIds);
+    }
+  }
+}
\ No newline at end of file
diff --git a/pinot-connectors/pom.xml b/pinot-connectors/pom.xml
index 64d798d..1b45cc1 100644
--- a/pinot-connectors/pom.xml
+++ b/pinot-connectors/pom.xml
@@ -47,11 +47,23 @@
       <groupId>org.apache.pinot</groupId>
       <artifactId>pinot-common</artifactId>
       <scope>provided</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.scala-lang</groupId>
+          <artifactId>scala-library</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <dependency>
       <groupId>org.apache.pinot</groupId>
       <artifactId>pinot-core</artifactId>
       <scope>provided</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.scala-lang</groupId>
+          <artifactId>scala-library</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
 
     <!-- test -->


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org