You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2019/07/18 00:28:38 UTC

[incubator-pinot] branch refactor_kafka created (now 3226348)

This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a change to branch refactor_kafka
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


      at 3226348  Refactor pinot-connectors to break the dependencies from kafka 0.9

This branch includes the following new commits:

     new 3226348  Refactor pinot-connectors to break the dependencies from kafka 0.9

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/01: Refactor pinot-connectors to break the dependencies from kafka 0.9

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch refactor_kafka
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 3226348eab493d829215a8091e40a8caf7c0944d
Author: Xiang Fu <fx...@gmail.com>
AuthorDate: Wed Jul 17 17:28:20 2019 -0700

    Refactor pinot-connectors to break the dependencies from kafka 0.9
---
 pinot-common/pom.xml                               |   4 -
 pinot-connectors/pinot-connector-kafka-0.9/pom.xml |   7 +-
 .../impl/kafka/server/KafkaDataProducer.java       |  57 +++++++++
 .../KafkaDataServerStartable.java}                 | 136 +++++++++++++--------
 pinot-core/pom.xml                                 |   4 -
 .../core/realtime/stream/StreamDataProducer.java   |  35 ++++++
 .../core/realtime/stream/StreamDataProvider.java   |  45 +++++++
 .../realtime/stream/StreamDataServerStartable.java |  33 +++++
 .../function/FunctionExpressionEvaluatorTest.java  |   1 -
 pinot-integration-tests/pom.xml                    |   2 +-
 .../tests/BaseClusterIntegrationTest.java          |  25 ++--
 .../pinot/integration/tests/CommonKafkaUtils.java  | 102 ++++++++++++++++
 .../ControllerPeriodicTasksIntegrationTests.java   |   3 +-
 .../tests/HybridClusterIntegrationTest.java        |   3 +-
 ...ridClusterIntegrationTestCommandLineRunner.java |  13 +-
 .../tests/RealtimeClusterIntegrationTest.java      |   3 +-
 pinot-perf/pom.xml                                 |   2 +-
 .../perf/BenchmarkRealtimeConsumptionSpeed.java    |  13 +-
 .../org/apache/pinot/perf/RealtimeStressTest.java  |  16 +--
 pinot-tools/pom.xml                                |   3 +-
 .../org/apache/pinot/tools/HybridQuickstart.java   |  40 +++---
 .../org/apache/pinot/tools/RealtimeQuickStart.java |  35 ++++--
 .../tools/admin/command/StartKafkaCommand.java     |  20 ++-
 .../admin/command/StreamAvroIntoKafkaCommand.java  |  25 ++--
 .../pinot/tools/streams/AirlineDataStream.java     |  21 ++--
 .../pinot/tools/streams/MeetupRsvpStream.java      |  29 +++--
 pom.xml                                            |   9 +-
 27 files changed, 503 insertions(+), 183 deletions(-)

diff --git a/pinot-common/pom.xml b/pinot-common/pom.xml
index 2e27ac8..41c9b39 100644
--- a/pinot-common/pom.xml
+++ b/pinot-common/pom.xml
@@ -198,10 +198,6 @@
       <artifactId>jopt-simple</artifactId>
     </dependency>
     <dependency>
-      <groupId>org.scala-lang</groupId>
-      <artifactId>scala-library</artifactId>
-    </dependency>
-    <dependency>
       <groupId>nl.jqno.equalsverifier</groupId>
       <artifactId>equalsverifier</artifactId>
       <scope>test</scope>
diff --git a/pinot-connectors/pinot-connector-kafka-0.9/pom.xml b/pinot-connectors/pinot-connector-kafka-0.9/pom.xml
index e8f6c93..0450102 100644
--- a/pinot-connectors/pinot-connector-kafka-0.9/pom.xml
+++ b/pinot-connectors/pinot-connector-kafka-0.9/pom.xml
@@ -42,7 +42,7 @@
     <!-- Kafka  -->
     <dependency>
       <groupId>org.apache.kafka</groupId>
-      <artifactId>kafka_${kafka.scala.version}</artifactId>
+      <artifactId>kafka_2.10</artifactId>
       <version>${kafka.lib.version}</version>
       <exclusions>
         <exclusion>
@@ -63,5 +63,10 @@
         </exclusion>
       </exclusions>
     </dependency>
+    <dependency>
+      <groupId>org.scala-lang</groupId>
+      <artifactId>scala-library</artifactId>
+      <version>2.10.5</version>
+    </dependency>
   </dependencies>
 </project>
diff --git a/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/server/KafkaDataProducer.java b/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/server/KafkaDataProducer.java
new file mode 100644
index 0000000..0eb4ac6
--- /dev/null
+++ b/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/server/KafkaDataProducer.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.kafka.server;
+
+import java.util.Properties;
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+import kafka.producer.ProducerConfig;
+import org.apache.pinot.core.realtime.stream.StreamDataProducer;
+
+
+public class KafkaDataProducer implements StreamDataProducer {
+  Producer<byte[], byte[]> producer;
+
+  @Override
+  public void init(Properties props) {
+    ProducerConfig producerConfig = new ProducerConfig(props);
+    this.producer = new Producer(producerConfig);
+  }
+
+  @Override
+  public void produce(String topic, byte[] payload) {
+    KeyedMessage<byte[], byte[]> data = new KeyedMessage<>(topic, payload);
+    this.produce(data);
+  }
+
+  @Override
+  public void produce(String topic, byte[] key, byte[] payload) {
+    KeyedMessage<byte[], byte[]> data = new KeyedMessage<>(topic, key, payload);
+    this.produce(data);
+  }
+
+  public void produce(KeyedMessage message) {
+    producer.send(message);
+  }
+
+  @Override
+  public void close() {
+    producer.close();
+  }
+}
diff --git a/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaStarterUtils.java b/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/server/KafkaDataServerStartable.java
similarity index 63%
rename from pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaStarterUtils.java
rename to pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/server/KafkaDataServerStartable.java
index 5f1de99..dcd44c0 100644
--- a/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/KafkaStarterUtils.java
+++ b/pinot-connectors/pinot-connector-kafka-0.9/src/main/java/org/apache/pinot/core/realtime/impl/kafka/server/KafkaDataServerStartable.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.realtime.impl.kafka;
+package org.apache.pinot.core.realtime.impl.kafka.server;
 
 import java.io.File;
 import java.security.Permission;
@@ -29,17 +29,25 @@ import kafka.server.KafkaServerStartable;
 import org.I0Itec.zkclient.ZkClient;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.utils.ZkStarter;
+import org.apache.pinot.core.realtime.stream.StreamDataServerStartable;
 
 
-/**
- * Utilities to start Kafka during unit tests.
- *
- */
-public class KafkaStarterUtils {
+public class KafkaDataServerStartable implements StreamDataServerStartable {
   public static final int DEFAULT_KAFKA_PORT = 19092;
   public static final int DEFAULT_BROKER_ID = 0;
   public static final String DEFAULT_ZK_STR = ZkStarter.DEFAULT_ZK_STR + "/kafka";
   public static final String DEFAULT_KAFKA_BROKER = "localhost:" + DEFAULT_KAFKA_PORT;
+  private static final String PORT = "port";
+  private static final String BROKER_ID = "brokerId";
+  private static final String ZK_STR = "zkStr";
+  private static final String LOG_DIR_PATH = "logDirPath";
+  private static final int DEFAULT_TOPIC_PARTITION = 1;
+
+  private KafkaServerStartable serverStartable;
+  private String zkStr;
+  private int port;
+  private int brokerId;
+  private String logDirPath;
 
   public static Properties getDefaultKafkaConfiguration() {
     final Properties configuration = new Properties();
@@ -50,51 +58,34 @@ public class KafkaStarterUtils {
     // Set host name
     configureHostName(configuration, "localhost");
 
+    configuration.put(PORT, DEFAULT_KAFKA_PORT);
+    configuration.put(BROKER_ID, DEFAULT_BROKER_ID);
+    configuration.put(ZK_STR, DEFAULT_ZK_STR);
+    configuration.put(LOG_DIR_PATH, "/tmp/kafka-" + Double.toHexString(Math.random()));
+
     return configuration;
   }
 
-  public static List<KafkaServerStartable> startServers(final int brokerCount, final int port, final String zkStr,
+  public static List<StreamDataServerStartable> startServers(final int brokerCount, final int port, final String zkStr,
       final Properties configuration) {
-    List<KafkaServerStartable> startables = new ArrayList<>(brokerCount);
+    List<StreamDataServerStartable> startables = new ArrayList<>(brokerCount);
 
     for (int i = 0; i < brokerCount; i++) {
-      startables.add(startServer(port + i, i, zkStr, "/tmp/kafka-" + Double.toHexString(Math.random()), configuration));
+      startables.add(startServer(port + i, i, zkStr, configuration));
     }
-
     return startables;
   }
 
-  public static KafkaServerStartable startServer(final int port, final int brokerId, final String zkStr,
+  public static StreamDataServerStartable startServer(final int port, final int brokerId, final String zkStr,
       final Properties configuration) {
-    return startServer(port, brokerId, zkStr, "/tmp/kafka-" + Double.toHexString(Math.random()), configuration);
-  }
-
-  public static KafkaServerStartable startServer(final int port, final int brokerId, final String zkStr,
-      final String logDirPath, final Properties configuration) {
-    // Create the ZK nodes for Kafka, if needed
-    int indexOfFirstSlash = zkStr.indexOf('/');
-    if (indexOfFirstSlash != -1) {
-      String bareZkUrl = zkStr.substring(0, indexOfFirstSlash);
-      String zkNodePath = zkStr.substring(indexOfFirstSlash);
-      ZkClient client = new ZkClient(bareZkUrl);
-      client.createPersistent(zkNodePath, true);
-      client.close();
-    }
-
-    File logDir = new File(logDirPath);
-    logDir.mkdirs();
-
-    configureKafkaPort(configuration, port);
-    configureZkConnectionString(configuration, zkStr);
-    configureBrokerId(configuration, brokerId);
-    configureKafkaLogDirectory(configuration, logDir);
-    configuration.put("zookeeper.session.timeout.ms", "60000");
-    KafkaConfig config = new KafkaConfig(configuration);
-
-    KafkaServerStartable serverStartable = new KafkaServerStartable(config);
-    serverStartable.startup();
-
-    return serverStartable;
+    final KafkaDataServerStartable kafkaDataServerStartable = new KafkaDataServerStartable();
+    configuration.put(PORT, port);
+    configuration.put(BROKER_ID, brokerId);
+    configuration.put(ZK_STR, zkStr);
+    configuration.put(LOG_DIR_PATH, "/tmp/kafka-" + Double.toHexString(Math.random()));
+    kafkaDataServerStartable.init(configuration);
+    kafkaDataServerStartable.start();
+    return kafkaDataServerStartable;
   }
 
   public static void configureSegmentSizeBytes(Properties properties, int segmentSize) {
@@ -129,17 +120,6 @@ public class KafkaStarterUtils {
     configuration.put("host.name", hostName);
   }
 
-  public static void stopServer(KafkaServerStartable serverStartable) {
-    serverStartable.shutdown();
-    FileUtils.deleteQuietly(new File(serverStartable.serverConfig().logDirs().apply(0)));
-  }
-
-  public static void createTopic(String kafkaTopic, String zkStr, int partitionCount) {
-    invokeTopicCommand(
-        new String[]{"--create", "--zookeeper", zkStr, "--replication-factor", "1", "--partitions", Integer.toString(
-            partitionCount), "--topic", kafkaTopic});
-  }
-
   private static void invokeTopicCommand(String[] args) {
     // jfim: Use Java security to trap System.exit in Kafka 0.9's TopicCommand
     System.setSecurityManager(new SecurityManager() {
@@ -168,4 +148,58 @@ public class KafkaStarterUtils {
   public static void deleteTopic(String kafkaTopic, String zkStr) {
     invokeTopicCommand(new String[]{"--delete", "--zookeeper", zkStr, "--topic", kafkaTopic});
   }
+
+  public void init(Properties props) {
+    if (props == null) {
+      props = getDefaultKafkaConfiguration();
+    }
+    port = Integer.parseInt(props.getProperty(PORT));
+    brokerId = Integer.parseInt(props.getProperty(BROKER_ID));
+    zkStr = props.getProperty(ZK_STR);
+    logDirPath = props.getProperty(LOG_DIR_PATH);
+
+    // Create the ZK nodes for Kafka, if needed
+    int indexOfFirstSlash = zkStr.indexOf('/');
+    if (indexOfFirstSlash != -1) {
+      String bareZkUrl = zkStr.substring(0, indexOfFirstSlash);
+      String zkNodePath = zkStr.substring(indexOfFirstSlash);
+      ZkClient client = new ZkClient(bareZkUrl);
+      client.createPersistent(zkNodePath, true);
+      client.close();
+    }
+
+    File logDir = new File(logDirPath);
+    logDir.mkdirs();
+
+    configureKafkaPort(props, port);
+    configureZkConnectionString(props, zkStr);
+    configureBrokerId(props, brokerId);
+    configureKafkaLogDirectory(props, logDir);
+    props.put("zookeeper.session.timeout.ms", "60000");
+    KafkaConfig config = new KafkaConfig(props);
+
+    serverStartable = new KafkaServerStartable(config);
+  }
+
+  @Override
+  public void start() {
+    serverStartable.startup();
+  }
+
+  @Override
+  public void stop() {
+    serverStartable.shutdown();
+    FileUtils.deleteQuietly(new File(serverStartable.serverConfig().logDirs().apply(0)));
+  }
+
+  @Override
+  public void createTopic(String topic, Properties props) {
+    int partitionCount = DEFAULT_TOPIC_PARTITION;
+    if (props.containsKey("partition")) {
+      partitionCount = Integer.parseInt(props.getProperty("partition"));
+    }
+    invokeTopicCommand(
+        new String[]{"--create", "--zookeeper", this.zkStr, "--replication-factor", "1", "--partitions", Integer.toString(
+            partitionCount), "--topic", topic});
+  }
 }
diff --git a/pinot-core/pom.xml b/pinot-core/pom.xml
index 51c9231..d074639 100644
--- a/pinot-core/pom.xml
+++ b/pinot-core/pom.xml
@@ -161,10 +161,6 @@
       </exclusions>
     </dependency>
     <dependency>
-      <groupId>org.scala-lang</groupId>
-      <artifactId>scala-library</artifactId>
-    </dependency>
-    <dependency>
       <groupId>net.sf.jopt-simple</groupId>
       <artifactId>jopt-simple</artifactId>
     </dependency>
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamDataProducer.java b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamDataProducer.java
new file mode 100644
index 0000000..53275a0
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamDataProducer.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.stream;
+
+import java.util.Properties;
+
+
+/**
+ * StreamDataServerStartable is the interface for stream data sources. E.g. KafkaServerStartable, KinesisServerStarable.
+ */
+public interface StreamDataProducer {
+  void init(Properties props);
+
+  void produce(String topic, byte[] payload);
+
+  void produce(String topic, byte[] key, byte[] payload);
+
+  void close();
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamDataProvider.java b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamDataProvider.java
new file mode 100644
index 0000000..aec83a3
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamDataProvider.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.stream;
+
+import java.util.Properties;
+
+
+public class StreamDataProvider {
+  public static StreamDataServerStartable getServerDataStartable(String clazz, Properties props)
+      throws Exception {
+    final StreamDataServerStartable streamDataServerStartable =
+        (StreamDataServerStartable) Class.forName(clazz).newInstance();
+    streamDataServerStartable.init(props);
+    return streamDataServerStartable;
+  }
+
+  public static StreamDataProducer getStreamDataProducer(String clazz, Properties props)
+      throws Exception {
+
+    final StreamDataProducer streamDataProducer = (StreamDataProducer) Class.forName(clazz).newInstance();
+    streamDataProducer.init(props);
+    return streamDataProducer;
+  }
+
+  public static StreamDataServerStartable getServerDataStartable(String clazz)
+      throws Exception {
+    return getServerDataStartable(clazz, null);
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamDataServerStartable.java b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamDataServerStartable.java
new file mode 100644
index 0000000..f53e201
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamDataServerStartable.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.stream;
+
+import java.util.Properties;
+
+
+/**
+ * StreamDataServerStartable is the interface for stream data sources. E.g. KafkaDataServerStartable, KinesisDataServerStarable.
+ */
+public interface StreamDataServerStartable {
+  void init(Properties props);
+  void start();
+  void stop();
+
+  void createTopic(String topic, Properties props);
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/function/FunctionExpressionEvaluatorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/function/FunctionExpressionEvaluatorTest.java
index b6b41ff..dca78da 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/data/function/FunctionExpressionEvaluatorTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/data/function/FunctionExpressionEvaluatorTest.java
@@ -26,7 +26,6 @@ import org.joda.time.MutableDateTime;
 import org.joda.time.format.DateTimeFormat;
 import org.testng.Assert;
 import org.testng.annotations.Test;
-import scala.collection.mutable.StringBuilder;
 
 
 public class FunctionExpressionEvaluatorTest {
diff --git a/pinot-integration-tests/pom.xml b/pinot-integration-tests/pom.xml
index b58647f..b54a7d8 100644
--- a/pinot-integration-tests/pom.xml
+++ b/pinot-integration-tests/pom.xml
@@ -191,7 +191,7 @@
     </dependency>
     <dependency>
       <groupId>org.apache.pinot</groupId>
-      <artifactId>pinot-connector-kafka-${kafka.version}</artifactId>
+      <artifactId>pinot-connector-kafka-${kafka.lib.version}</artifactId>
       <version>${project.version}</version>
     </dependency>
     <dependency>
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
index 67fabc6..faffe9b 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
@@ -30,7 +30,6 @@ import java.util.Map;
 import java.util.concurrent.Executor;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
-import kafka.server.KafkaServerStartable;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.client.ConnectionFactory;
 import org.apache.pinot.common.config.ColumnPartitionConfig;
@@ -40,7 +39,7 @@ import org.apache.pinot.common.config.TagNameUtils;
 import org.apache.pinot.common.utils.TarGzCompressionUtils;
 import org.apache.pinot.common.utils.ZkStarter;
 import org.apache.pinot.core.realtime.impl.kafka.KafkaConsumerFactory;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils;
+import org.apache.pinot.core.realtime.stream.StreamDataServerStartable;
 import org.apache.pinot.util.TestUtils;
 import org.testng.Assert;
 
@@ -73,12 +72,13 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest {
   protected final File _avroDir = new File(_tempDir, "avroDir");
   protected final File _segmentDir = new File(_tempDir, "segmentDir");
   protected final File _tarDir = new File(_tempDir, "tarDir");
-  protected List<KafkaServerStartable> _kafkaStarters;
+  protected List<StreamDataServerStartable> _kafkaStarters;
 
   private org.apache.pinot.client.Connection _pinotConnection;
   private Connection _h2Connection;
   private QueryGenerator _queryGenerator;
 
+
   /**
    * The following getters can be overridden to change default settings.
    */
@@ -318,8 +318,9 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest {
       @Override
       public void run() {
         try {
-          ClusterIntegrationTestUtils.pushAvroIntoKafka(avroFiles, KafkaStarterUtils.DEFAULT_KAFKA_BROKER, kafkaTopic,
-              getMaxNumKafkaMessagesPerBatch(), getKafkaMessageHeader(), getPartitionColumn());
+          ClusterIntegrationTestUtils
+              .pushAvroIntoKafka(avroFiles, CommonKafkaUtils.DEFAULT_KAFKA_BROKER, kafkaTopic,
+                  getMaxNumKafkaMessagesPerBatch(), getKafkaMessageHeader(), getPartitionColumn());
         } catch (Exception e) {
           // Ignored
         }
@@ -328,15 +329,17 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest {
   }
 
   protected void startKafka() {
-    _kafkaStarters = KafkaStarterUtils
-        .startServers(getNumKafkaBrokers(), KafkaStarterUtils.DEFAULT_KAFKA_PORT, KafkaStarterUtils.DEFAULT_ZK_STR,
-            KafkaStarterUtils.getDefaultKafkaConfiguration());
-    KafkaStarterUtils.createTopic(getKafkaTopic(), KafkaStarterUtils.DEFAULT_ZK_STR, getNumKafkaPartitions());
+
+    _kafkaStarters =
+        CommonKafkaUtils.startServers(getNumKafkaBrokers(), CommonKafkaUtils.DEFAULT_KAFKA_PORT, CommonKafkaUtils.DEFAULT_ZK_STR,
+            CommonKafkaUtils.getDefaultKafkaConfiguration());
+    _kafkaStarters.get(0)
+        .createTopic(getKafkaTopic(), CommonKafkaUtils.getTopicCreationProps(getNumKafkaPartitions()));
   }
 
   protected void stopKafka() {
-    for (KafkaServerStartable kafkaStarter : _kafkaStarters) {
-      KafkaStarterUtils.stopServer(kafkaStarter);
+    for (StreamDataServerStartable kafkaStarter : _kafkaStarters) {
+      kafkaStarter.stop();
     }
   }
 
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CommonKafkaUtils.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CommonKafkaUtils.java
new file mode 100644
index 0000000..7c6ed83
--- /dev/null
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CommonKafkaUtils.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import org.apache.pinot.common.utils.ZkStarter;
+import org.apache.pinot.core.realtime.stream.StreamDataProvider;
+import org.apache.pinot.core.realtime.stream.StreamDataServerStartable;
+
+
+public class CommonKafkaUtils {
+  public static final int DEFAULT_BROKER_ID = 0;
+  public static final String DEFAULT_ZK_STR = ZkStarter.DEFAULT_ZK_STR + "/kafka";
+  public static final String PORT = "port";
+  public static final String BROKER_ID = "brokerId";
+  public static final String ZK_STR = "zkStr";
+  public static final String LOG_DIR_PATH = "logDirPath";
+  public static int DEFAULT_KAFKA_PORT = 19092;
+  public static final String DEFAULT_KAFKA_BROKER = "localhost:" + DEFAULT_KAFKA_PORT;
+  public static final String KAFKA_09_SERVER_STARTABLE_CLASS_NAME = "org.apache.pinot.core.realtime.impl.kafka.server.KafkaDataServerStartable";
+
+  public static Properties getDefaultKafkaConfiguration() {
+    final Properties configuration = new Properties();
+
+    // Enable topic deletion by default for integration tests
+    configureTopicDeletion(configuration, true);
+
+    // Set host name
+    configureHostName(configuration, "localhost");
+
+    configuration.put(PORT, DEFAULT_KAFKA_PORT);
+    configuration.put(BROKER_ID, DEFAULT_BROKER_ID);
+    configuration.put(ZK_STR, DEFAULT_ZK_STR);
+    configuration.put(LOG_DIR_PATH, "/tmp/kafka-" + Double.toHexString(Math.random()));
+
+    return configuration;
+  }
+
+  public static void configureTopicDeletion(Properties configuration, boolean topicDeletionEnabled) {
+    configuration.put("delete.topic.enable", Boolean.toString(topicDeletionEnabled));
+  }
+
+  public static void configureHostName(Properties configuration, String hostName) {
+    configuration.put("host.name", hostName);
+  }
+
+  public static Properties getTopicCreationProps(int numKafkaPartitions) {
+    Properties topicProps = new Properties();
+    topicProps.put("partition", numKafkaPartitions);
+    return topicProps;
+  }
+
+
+  public static List<StreamDataServerStartable> startServers(final int brokerCount, final int port, final String zkStr,
+      final Properties configuration) {
+    List<StreamDataServerStartable> startables = new ArrayList<>(brokerCount);
+
+    for (int i = 0; i < brokerCount; i++) {
+      startables.add(startServer(port + i, i, zkStr, configuration));
+    }
+    return startables;
+  }
+
+  public static StreamDataServerStartable startServer(final int port, final int brokerId, final String zkStr,
+      final Properties configuration) {
+    StreamDataServerStartable kafkaStarter;
+
+    String kafkaClazz = KAFKA_09_SERVER_STARTABLE_CLASS_NAME;
+    try {
+      kafkaStarter = StreamDataProvider.getServerDataStartable(kafkaClazz);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to start " + kafkaClazz, e);
+    }
+
+    configuration.put(CommonKafkaUtils.PORT, port);
+    configuration.put(CommonKafkaUtils.BROKER_ID, brokerId);
+    configuration.put(CommonKafkaUtils.ZK_STR, zkStr);
+    configuration.put(CommonKafkaUtils.LOG_DIR_PATH, "/tmp/kafka-" + Double.toHexString(Math.random()));
+
+    kafkaStarter.init(configuration);
+    kafkaStarter.start();
+    return kafkaStarter;
+  }
+}
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java
index fc14789..64fe60f 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java
@@ -51,7 +51,6 @@ import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.validation.OfflineSegmentIntervalChecker;
 import org.apache.pinot.controller.validation.RealtimeSegmentValidationManager;
 import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils;
 import org.apache.pinot.util.TestUtils;
 import org.testng.Assert;
 import org.testng.ITestContext;
@@ -212,7 +211,7 @@ public class ControllerPeriodicTasksIntegrationTests extends BaseClusterIntegrat
     Assert.assertNotNull(outgoingTimeUnit);
     String timeType = outgoingTimeUnit.toString();
 
-    addRealtimeTable(table, useLlc(), KafkaStarterUtils.DEFAULT_KAFKA_BROKER, KafkaStarterUtils.DEFAULT_ZK_STR, topic,
+    addRealtimeTable(table, useLlc(), CommonKafkaUtils.DEFAULT_KAFKA_BROKER, CommonKafkaUtils.DEFAULT_ZK_STR, topic,
         getRealtimeSegmentFlushSize(), avroFile, timeColumnName, timeType, schemaName, TENANT_NAME, TENANT_NAME,
         getLoadMode(), getSortedColumn(), getInvertedIndexColumns(), getBloomFilterIndexColumns(), getRawIndexColumns(),
         getTaskConfig(), getStreamConsumerFactoryClassName());
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java
index 0983d4e..004cdf2 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java
@@ -33,7 +33,6 @@ import org.apache.pinot.common.data.Schema;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.JsonUtils;
 import org.apache.pinot.controller.ControllerConf;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils;
 import org.apache.pinot.util.TestUtils;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -132,7 +131,7 @@ public class HybridClusterIntegrationTest extends BaseClusterIntegrationTestSet
     Assert.assertNotNull(outgoingTimeUnit);
     String timeType = outgoingTimeUnit.toString();
 
-    addHybridTable(getTableName(), useLlc(), KafkaStarterUtils.DEFAULT_KAFKA_BROKER, KafkaStarterUtils.DEFAULT_ZK_STR,
+    addHybridTable(getTableName(), useLlc(), CommonKafkaUtils.DEFAULT_KAFKA_BROKER, CommonKafkaUtils.DEFAULT_ZK_STR,
         getKafkaTopic(), getRealtimeSegmentFlushSize(), avroFile, timeColumnName, timeType, schemaName, TENANT_NAME,
         TENANT_NAME, getLoadMode(), getSortedColumn(), getInvertedIndexColumns(), getBloomFilterIndexColumns(),
         getRawIndexColumns(), getTaskConfig(), getStreamConsumerFactoryClassName(), getSegmentPartitionConfig());
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTestCommandLineRunner.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTestCommandLineRunner.java
index be24be8..970265c 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTestCommandLineRunner.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTestCommandLineRunner.java
@@ -34,13 +34,12 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import javax.annotation.Nonnull;
-import kafka.server.KafkaServerStartable;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.broker.requesthandler.PinotQueryRequest;
 import org.apache.pinot.common.data.Schema;
 import org.apache.pinot.common.utils.JsonUtils;
 import org.apache.pinot.controller.ControllerConf;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils;
+import org.apache.pinot.core.realtime.stream.StreamDataServerStartable;
 import org.apache.pinot.tools.query.comparison.QueryComparison;
 import org.apache.pinot.util.TestUtils;
 import org.testng.Assert;
@@ -188,7 +187,7 @@ public class HybridClusterIntegrationTestCommandLineRunner {
     private List<File> _realtimeAvroFiles;
     private File _queryFile;
     private File _responseFile;
-    private KafkaServerStartable _kafkaStarter;
+    private StreamDataServerStartable _kafkaStarter;
     private long _countStarResult;
 
     public CustomHybridClusterIntegrationTest() {
@@ -258,11 +257,11 @@ public class HybridClusterIntegrationTestCommandLineRunner {
 
       // Start Zk and Kafka
       startZk(ZK_PORT);
-      _kafkaStarter = KafkaStarterUtils.startServer(KAFKA_PORT, KafkaStarterUtils.DEFAULT_BROKER_ID, KAFKA_ZK_STR,
-          KafkaStarterUtils.getDefaultKafkaConfiguration());
+      _kafkaStarter = CommonKafkaUtils.startServer(KAFKA_PORT, CommonKafkaUtils.DEFAULT_BROKER_ID, KAFKA_ZK_STR,
+          CommonKafkaUtils.getDefaultKafkaConfiguration());
 
       // Create Kafka topic
-      KafkaStarterUtils.createTopic(getKafkaTopic(), KAFKA_ZK_STR, getNumKafkaPartitions());
+      _kafkaStarter.createTopic(getKafkaTopic(), CommonKafkaUtils.getTopicCreationProps(getNumKafkaPartitions()));
 
       // Start the Pinot cluster
       ControllerConf config = getDefaultControllerConfiguration();
@@ -379,7 +378,7 @@ public class HybridClusterIntegrationTestCommandLineRunner {
       stopServer();
       stopBroker();
       stopController();
-      KafkaStarterUtils.stopServer(_kafkaStarter);
+      _kafkaStarter.stop();
       stopZk();
 
       FileUtils.deleteDirectory(_tempDir);
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java
index 8b4baeb..cf833ce 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java
@@ -26,7 +26,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.data.Schema;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils;
 import org.apache.pinot.util.TestUtils;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -90,7 +89,7 @@ public class RealtimeClusterIntegrationTest extends BaseClusterIntegrationTestSe
     Assert.assertNotNull(outgoingTimeUnit);
     String timeType = outgoingTimeUnit.toString();
 
-    addRealtimeTable(getTableName(), useLlc(), KafkaStarterUtils.DEFAULT_KAFKA_BROKER, KafkaStarterUtils.DEFAULT_ZK_STR,
+    addRealtimeTable(getTableName(), useLlc(), CommonKafkaUtils.DEFAULT_KAFKA_BROKER, CommonKafkaUtils.DEFAULT_ZK_STR,
         getKafkaTopic(), getRealtimeSegmentFlushSize(), avroFile, timeColumnName, timeType, schemaName,
         getBrokerTenant(), getServerTenant(), getLoadMode(), getSortedColumn(),
         getInvertedIndexColumns(), getBloomFilterIndexColumns(), getRawIndexColumns(), getTaskConfig(),
diff --git a/pinot-perf/pom.xml b/pinot-perf/pom.xml
index 74bb09f..2a0f5ed 100644
--- a/pinot-perf/pom.xml
+++ b/pinot-perf/pom.xml
@@ -52,7 +52,7 @@
     </dependency>
     <dependency>
       <groupId>org.apache.pinot</groupId>
-      <artifactId>pinot-connector-kafka-${kafka.version}</artifactId>
+      <artifactId>pinot-connector-kafka-${kafka.lib.version}</artifactId>
       <version>${project.version}</version>
     </dependency>
     <dependency>
diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRealtimeConsumptionSpeed.java b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRealtimeConsumptionSpeed.java
index 8bbb4d5..11535aa 100644
--- a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRealtimeConsumptionSpeed.java
+++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRealtimeConsumptionSpeed.java
@@ -27,8 +27,9 @@ import java.util.Random;
 import java.util.concurrent.TimeUnit;
 import kafka.server.KafkaServerStartable;
 import org.apache.pinot.common.utils.TarGzCompressionUtils;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils;
+import org.apache.pinot.core.realtime.stream.StreamDataServerStartable;
 import org.apache.pinot.integration.tests.ClusterIntegrationTestUtils;
+import org.apache.pinot.integration.tests.CommonKafkaUtils;
 import org.apache.pinot.integration.tests.RealtimeClusterIntegrationTest;
 import org.apache.pinot.util.TestUtils;
 
@@ -58,12 +59,12 @@ public class BenchmarkRealtimeConsumptionSpeed extends RealtimeClusterIntegratio
       throws Exception {
     // Start ZK and Kafka
     startZk();
-    KafkaServerStartable kafkaStarter = KafkaStarterUtils
-        .startServer(KafkaStarterUtils.DEFAULT_KAFKA_PORT, KafkaStarterUtils.DEFAULT_BROKER_ID,
-            KafkaStarterUtils.DEFAULT_ZK_STR, KafkaStarterUtils.getDefaultKafkaConfiguration());
+    StreamDataServerStartable kafkaStarter = CommonKafkaUtils
+        .startServer(CommonKafkaUtils.DEFAULT_KAFKA_PORT, CommonKafkaUtils.DEFAULT_BROKER_ID,
+            CommonKafkaUtils.DEFAULT_ZK_STR, CommonKafkaUtils.getDefaultKafkaConfiguration());
 
     // Create Kafka topic
-    KafkaStarterUtils.createTopic(getKafkaTopic(), KafkaStarterUtils.DEFAULT_ZK_STR, 10);
+    kafkaStarter.createTopic(getKafkaTopic(), CommonKafkaUtils.getTopicCreationProps(10));
 
     // Unpack data (needed to get the Avro schema)
     TarGzCompressionUtils.unTar(new File(TestUtils.getFileFromResourceUrl(
@@ -93,7 +94,7 @@ public class BenchmarkRealtimeConsumptionSpeed extends RealtimeClusterIntegratio
       public void run() {
         try {
           ClusterIntegrationTestUtils
-              .pushRandomAvroIntoKafka(avroFiles.get(0), KafkaStarterUtils.DEFAULT_KAFKA_BROKER, getKafkaTopic(),
+              .pushRandomAvroIntoKafka(avroFiles.get(0), CommonKafkaUtils.DEFAULT_KAFKA_BROKER, getKafkaTopic(),
                   ROW_COUNT, getMaxNumKafkaMessagesPerBatch(), getKafkaMessageHeader(), getPartitionColumn());
         } catch (Exception e) {
           // Ignored
diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/RealtimeStressTest.java b/pinot-perf/src/main/java/org/apache/pinot/perf/RealtimeStressTest.java
index 71d28e7..7ffb16a 100644
--- a/pinot-perf/src/main/java/org/apache/pinot/perf/RealtimeStressTest.java
+++ b/pinot-perf/src/main/java/org/apache/pinot/perf/RealtimeStressTest.java
@@ -25,10 +25,10 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
 import java.util.concurrent.TimeUnit;
-import kafka.server.KafkaServerStartable;
 import org.apache.pinot.common.utils.TarGzCompressionUtils;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils;
+import org.apache.pinot.core.realtime.stream.StreamDataServerStartable;
 import org.apache.pinot.integration.tests.ClusterIntegrationTestUtils;
+import org.apache.pinot.integration.tests.CommonKafkaUtils;
 import org.apache.pinot.integration.tests.OfflineClusterIntegrationTest;
 import org.apache.pinot.integration.tests.RealtimeClusterIntegrationTest;
 import org.apache.pinot.util.TestUtils;
@@ -59,12 +59,12 @@ public class RealtimeStressTest extends RealtimeClusterIntegrationTest {
       throws Exception {
     // Start ZK and Kafka
     startZk();
-    KafkaServerStartable kafkaStarter = KafkaStarterUtils
-        .startServer(KafkaStarterUtils.DEFAULT_KAFKA_PORT, KafkaStarterUtils.DEFAULT_BROKER_ID,
-            KafkaStarterUtils.DEFAULT_ZK_STR, KafkaStarterUtils.getDefaultKafkaConfiguration());
+    StreamDataServerStartable kafkaStarter = CommonKafkaUtils
+        .startServer(CommonKafkaUtils.DEFAULT_KAFKA_PORT, CommonKafkaUtils.DEFAULT_BROKER_ID,
+            CommonKafkaUtils.DEFAULT_ZK_STR, CommonKafkaUtils.getDefaultKafkaConfiguration());
 
     // Create Kafka topic
-    KafkaStarterUtils.createTopic(getKafkaTopic(), KafkaStarterUtils.DEFAULT_ZK_STR, 10);
+    kafkaStarter.createTopic(getKafkaTopic(), CommonKafkaUtils.getTopicCreationProps(10));
 
     // Unpack data (needed to get the Avro schema)
     TarGzCompressionUtils.unTar(new File(TestUtils.getFileFromResourceUrl(
@@ -93,7 +93,7 @@ public class RealtimeStressTest extends RealtimeClusterIntegrationTest {
 
     // Generate ROW_COUNT rows and write them into Kafka
     ClusterIntegrationTestUtils
-        .pushRandomAvroIntoKafka(avroFiles.get(0), KafkaStarterUtils.DEFAULT_KAFKA_BROKER, getKafkaTopic(), ROW_COUNT,
+        .pushRandomAvroIntoKafka(avroFiles.get(0), CommonKafkaUtils.DEFAULT_KAFKA_BROKER, getKafkaTopic(), ROW_COUNT,
             getMaxNumKafkaMessagesPerBatch(), getKafkaMessageHeader(), getPartitionColumn());
     rowsWritten += ROW_COUNT;
 
@@ -115,7 +115,7 @@ public class RealtimeStressTest extends RealtimeClusterIntegrationTest {
       // Write more rows if needed
       if (rowsWritten - pinotRecordCount < MIN_ROW_COUNT) {
         ClusterIntegrationTestUtils
-            .pushRandomAvroIntoKafka(avroFiles.get(0), KafkaStarterUtils.DEFAULT_KAFKA_BROKER, getKafkaTopic(),
+            .pushRandomAvroIntoKafka(avroFiles.get(0), CommonKafkaUtils.DEFAULT_KAFKA_BROKER, getKafkaTopic(),
                 ROW_COUNT, getMaxNumKafkaMessagesPerBatch(), getKafkaMessageHeader(), getPartitionColumn());
         rowsWritten += ROW_COUNT;
       }
diff --git a/pinot-tools/pom.xml b/pinot-tools/pom.xml
index 6e18295..cf9f507 100644
--- a/pinot-tools/pom.xml
+++ b/pinot-tools/pom.xml
@@ -56,8 +56,9 @@
     </dependency>
     <dependency>
       <groupId>org.apache.pinot</groupId>
-      <artifactId>pinot-connector-kafka-${kafka.version}</artifactId>
+      <artifactId>pinot-connector-kafka-${kafka.lib.version}</artifactId>
       <version>${project.version}</version>
+      <scope>runtime</scope>
     </dependency>
     <dependency>
       <groupId>commons-cli</groupId>
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java
index fe004d3..22f7476 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java
@@ -23,12 +23,13 @@ import com.google.common.collect.Lists;
 import java.io.File;
 import java.io.IOException;
 import java.net.URL;
-import kafka.server.KafkaServerStartable;
+import java.util.Properties;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.data.Schema;
 import org.apache.pinot.common.utils.ZkStarter;
 import org.apache.pinot.core.data.readers.FileFormat;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils;
+import org.apache.pinot.core.realtime.stream.StreamDataProvider;
+import org.apache.pinot.core.realtime.stream.StreamDataServerStartable;
 import org.apache.pinot.tools.Quickstart.Color;
 import org.apache.pinot.tools.admin.command.QuickstartRunner;
 import org.apache.pinot.tools.streams.AirlineDataStream;
@@ -37,16 +38,21 @@ import static org.apache.pinot.tools.Quickstart.printStatus;
 
 
 public class HybridQuickstart {
-  private HybridQuickstart() {
-  }
-
   private File _offlineQuickStartDataDir;
   private File _realtimeQuickStartDataDir;
-  private KafkaServerStartable _kafkaStarter;
+  private StreamDataServerStartable _kafkaStarter;
   private ZkStarter.ZookeeperInstance _zookeeperInstance;
   private File _schemaFile;
   private File _dataFile;
 
+  private HybridQuickstart() {
+  }
+
+  public static void main(String[] args)
+      throws Exception {
+    new HybridQuickstart().execute();
+  }
+
   private QuickstartTableRequest prepareOfflineTableRequest()
       throws IOException {
     _offlineQuickStartDataDir = new File("quickStartData" + System.currentTimeMillis());
@@ -94,11 +100,16 @@ public class HybridQuickstart {
   private void startKafka() {
     _zookeeperInstance = ZkStarter.startLocalZkServer();
 
-    _kafkaStarter = KafkaStarterUtils
-        .startServer(KafkaStarterUtils.DEFAULT_KAFKA_PORT, KafkaStarterUtils.DEFAULT_BROKER_ID,
-            KafkaStarterUtils.DEFAULT_ZK_STR, KafkaStarterUtils.getDefaultKafkaConfiguration());
-
-    KafkaStarterUtils.createTopic("airlineStatsEvents", KafkaStarterUtils.DEFAULT_ZK_STR, 10);
+    String kafkaClazz = "org.apache.pinot.core.realtime.impl.kafka.server.KafkaDataServerStartable";
+    try {
+      _kafkaStarter = StreamDataProvider.getServerDataStartable(kafkaClazz);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to start " + kafkaClazz, e);
+    }
+    _kafkaStarter.start();
+    Properties topicProps = new Properties();
+    topicProps.put("partition", 10);
+    _kafkaStarter.createTopic("airlineStatsEvents", topicProps);
   }
 
   public void execute()
@@ -153,7 +164,7 @@ public class HybridQuickstart {
           stream.shutdown();
           Thread.sleep(2000);
           runner.stop();
-          KafkaStarterUtils.stopServer(_kafkaStarter);
+          _kafkaStarter.stop();
           ZkStarter.stopLocalZkServer(_zookeeperInstance);
           FileUtils.deleteDirectory(_offlineQuickStartDataDir);
           FileUtils.deleteDirectory(_realtimeQuickStartDataDir);
@@ -163,9 +174,4 @@ public class HybridQuickstart {
       }
     });
   }
-
-  public static void main(String[] args)
-      throws Exception {
-    new HybridQuickstart().execute();
-  }
 }
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java
index 8df1fbb..44661fa 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/RealtimeQuickStart.java
@@ -22,10 +22,11 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import java.io.File;
 import java.net.URL;
-import kafka.server.KafkaServerStartable;
+import java.util.Properties;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.utils.ZkStarter;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils;
+import org.apache.pinot.core.realtime.stream.StreamDataProvider;
+import org.apache.pinot.core.realtime.stream.StreamDataServerStartable;
 import org.apache.pinot.tools.Quickstart.Color;
 import org.apache.pinot.tools.admin.command.QuickstartRunner;
 import org.apache.pinot.tools.streams.MeetupRsvpStream;
@@ -35,9 +36,16 @@ import static org.apache.pinot.tools.Quickstart.printStatus;
 
 
 public class RealtimeQuickStart {
+  private StreamDataServerStartable _kafkaStarter;
+
   private RealtimeQuickStart() {
   }
 
+  public static void main(String[] args)
+      throws Exception {
+    new RealtimeQuickStart().execute();
+  }
+
   public void execute()
       throws Exception {
     final File quickStartDataDir = new File("quickStartData" + System.currentTimeMillis());
@@ -64,10 +72,18 @@ public class RealtimeQuickStart {
 
     printStatus(Color.CYAN, "***** Starting Kafka *****");
     final ZkStarter.ZookeeperInstance zookeeperInstance = ZkStarter.startLocalZkServer();
-    final KafkaServerStartable kafkaStarter = KafkaStarterUtils
-        .startServer(KafkaStarterUtils.DEFAULT_KAFKA_PORT, KafkaStarterUtils.DEFAULT_BROKER_ID,
-            KafkaStarterUtils.DEFAULT_ZK_STR, KafkaStarterUtils.getDefaultKafkaConfiguration());
-    KafkaStarterUtils.createTopic("meetupRSVPEvents", KafkaStarterUtils.DEFAULT_ZK_STR, 10);
+
+    String kafkaClazz = "org.apache.pinot.core.realtime.impl.kafka.server.KafkaDataServerStartable";
+    try {
+      _kafkaStarter = StreamDataProvider.getServerDataStartable(kafkaClazz);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to start " + kafkaClazz, e);
+    }
+    _kafkaStarter.start();
+    Properties topicProps = new Properties();
+    topicProps.put("partition", 10);
+    _kafkaStarter.createTopic("meetupRSVPEvents", topicProps);
+
     printStatus(Color.CYAN, "***** Starting Zookeeper, controller, server and broker *****");
     runner.startAll();
     printStatus(Color.CYAN, "***** Adding meetupRSVP schema *****");
@@ -87,7 +103,7 @@ public class RealtimeQuickStart {
           printStatus(Color.GREEN, "***** Shutting down realtime quick start *****");
           meetupRSVPProvider.stopPublishing();
           runner.stop();
-          KafkaStarterUtils.stopServer(kafkaStarter);
+          _kafkaStarter.stop();
           ZkStarter.stopLocalZkServer(zookeeperInstance);
           FileUtils.deleteDirectory(quickStartDataDir);
         } catch (Exception e) {
@@ -130,9 +146,4 @@ public class RealtimeQuickStart {
 
     printStatus(Color.GREEN, "You can always go to http://localhost:9000/query/ to play around in the query console");
   }
-
-  public static void main(String[] args)
-      throws Exception {
-    new RealtimeQuickStart().execute();
-  }
 }
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartKafkaCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartKafkaCommand.java
index 38f042c..e1e7d40 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartKafkaCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartKafkaCommand.java
@@ -20,7 +20,8 @@ package org.apache.pinot.tools.admin.command;
 
 import java.io.File;
 import java.io.IOException;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils;
+import org.apache.pinot.core.realtime.stream.StreamDataProvider;
+import org.apache.pinot.core.realtime.stream.StreamDataServerStartable;
 import org.apache.pinot.tools.Command;
 import org.kohsuke.args4j.Option;
 import org.slf4j.Logger;
@@ -32,17 +33,22 @@ import org.slf4j.LoggerFactory;
  */
 public class StartKafkaCommand extends AbstractBaseAdminCommand implements Command {
   private static final Logger LOGGER = LoggerFactory.getLogger(StartKafkaCommand.class);
+  private static final int DEFAULT_KAFKA_PORT = 19092;
+  private static final int DEFAULT_BROKER_ID = 0;
+
+
   @Option(name = "-port", required = false, metaVar = "<int>", usage = "Port to start Kafka server on.")
-  private int _port = KafkaStarterUtils.DEFAULT_KAFKA_PORT;
+  private int _port = DEFAULT_KAFKA_PORT;
 
   @Option(name = "-help", required = false, help = true, aliases = {"-h", "--h", "--help"}, usage = "Print this message.")
   private boolean _help = false;
 
   @Option(name = "-brokerId", required = false, metaVar = "<int>", usage = "Kafka broker ID.")
-  private int _brokerId = KafkaStarterUtils.DEFAULT_BROKER_ID;
+  private int _brokerId = DEFAULT_BROKER_ID;
 
   @Option(name = "-zkAddress", required = false, metaVar = "<string>", usage = "Address of Zookeeper.")
   private String _zkAddress = "localhost:2181";
+  private StreamDataServerStartable _kafkaStarter;
 
   @Override
   public boolean getHelp() {
@@ -67,7 +73,13 @@ public class StartKafkaCommand extends AbstractBaseAdminCommand implements Comma
   @Override
   public boolean execute()
       throws IOException {
-    KafkaStarterUtils.startServer(_port, _brokerId, _zkAddress, KafkaStarterUtils.getDefaultKafkaConfiguration());
+    String kafkaClazz = "org.apache.pinot.core.realtime.impl.kafka.server.KafkaDataServerStartable";
+    try {
+      _kafkaStarter = StreamDataProvider.getServerDataStartable(kafkaClazz);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to start " + kafkaClazz, e);
+    }
+    _kafkaStarter.start();
 
     LOGGER.info("Start kafka at localhost:" + _port + " in thread " + Thread.currentThread().getName());
 
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamAvroIntoKafkaCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamAvroIntoKafkaCommand.java
index 0a75023..f0ff30e 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamAvroIntoKafkaCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamAvroIntoKafkaCommand.java
@@ -24,14 +24,12 @@ import java.io.File;
 import java.io.IOException;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
 import org.apache.avro.file.DataFileStream;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.pinot.common.utils.HashUtil;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils;
 import org.apache.pinot.core.util.AvroUtils;
+import org.apache.pinot.core.realtime.stream.StreamDataProducer;
+import org.apache.pinot.core.realtime.stream.StreamDataProvider;
 import org.apache.pinot.tools.Command;
 import org.kohsuke.args4j.Option;
 import org.slf4j.Logger;
@@ -42,8 +40,8 @@ import org.slf4j.LoggerFactory;
  * Class for command to stream Avro data into Kafka.
  */
 public class StreamAvroIntoKafkaCommand extends AbstractBaseAdminCommand implements Command {
+  public static final String DEFAULT_KAFKA_BROKER = "localhost:19092";
   private static final Logger LOGGER = LoggerFactory.getLogger(StreamAvroIntoKafkaCommand.class);
-
   @Option(name = "-avroFile", required = true, metaVar = "<String>", usage = "Avro file to stream.")
   private String _avroFile = null;
 
@@ -51,7 +49,7 @@ public class StreamAvroIntoKafkaCommand extends AbstractBaseAdminCommand impleme
   private boolean _help = false;
 
   @Option(name = "-kafkaBrokerList", required = false, metaVar = "<String>", usage = "Kafka broker list.")
-  private String _kafkaBrokerList = KafkaStarterUtils.DEFAULT_KAFKA_BROKER;
+  private String _kafkaBrokerList = DEFAULT_KAFKA_BROKER;
 
   @Option(name = "-kafkaTopic", required = true, metaVar = "<String>", usage = "Kafka topic to stream into.")
   private String _kafkaTopic = null;
@@ -104,8 +102,13 @@ public class StreamAvroIntoKafkaCommand extends AbstractBaseAdminCommand impleme
     properties.put("serializer.class", "kafka.serializer.DefaultEncoder");
     properties.put("request.required.acks", "1");
 
-    ProducerConfig producerConfig = new ProducerConfig(properties);
-    Producer<byte[], byte[]> producer = new Producer<byte[], byte[]>(producerConfig);
+    String producerClass = "org.apache.pinot.core.realtime.impl.kafka.server.KafkaDataProducer";
+    StreamDataProducer streamDataProducer;
+    try {
+      streamDataProducer = StreamDataProvider.getStreamDataProducer(producerClass, properties);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to get StreamDataProducer - " + producerClass, e);
+    }
     try {
       // Open the Avro file
       DataFileStream<GenericRecord> reader = AvroUtils.getAvroReader(new File(_avroFile));
@@ -115,11 +118,7 @@ public class StreamAvroIntoKafkaCommand extends AbstractBaseAdminCommand impleme
         // Write the message to Kafka
         String recordJson = genericRecord.toString();
         byte[] bytes = recordJson.getBytes("utf-8");
-        KeyedMessage<byte[], byte[]> data =
-            new KeyedMessage<byte[], byte[]>(_kafkaTopic, Longs.toByteArray(HashUtil.hash64(bytes, bytes.length)),
-                bytes);
-
-        producer.send(data);
+        streamDataProducer.produce(_kafkaTopic, Longs.toByteArray(HashUtil.hash64(bytes, bytes.length)), bytes);
 
         // Sleep between messages
         if (sleepRequired) {
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AirlineDataStream.java b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AirlineDataStream.java
index 7c238bf..9a189cf 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AirlineDataStream.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AirlineDataStream.java
@@ -27,9 +27,6 @@ import java.io.IOException;
 import java.util.Properties;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
 import org.apache.avro.file.DataFileStream;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
@@ -37,7 +34,8 @@ import org.apache.pinot.common.data.FieldSpec;
 import org.apache.pinot.common.data.Schema;
 import org.apache.pinot.common.data.TimeFieldSpec;
 import org.apache.pinot.common.utils.JsonUtils;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils;
+import org.apache.pinot.core.realtime.stream.StreamDataProducer;
+import org.apache.pinot.core.realtime.stream.StreamDataProvider;
 import org.apache.pinot.tools.Quickstart;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,28 +43,29 @@ import org.slf4j.LoggerFactory;
 
 public class AirlineDataStream {
   private static final Logger logger = LoggerFactory.getLogger(AirlineDataStream.class);
+  private static final String DEFAULT_KAFKA_BROKER = "localhost:19092";
 
   Schema pinotSchema;
   File avroFile;
   DataFileStream<GenericRecord> avroDataStream;
   Integer currentTimeValue = 16102;
   boolean keepIndexing = true;
-  private Producer<String, byte[]> producer;
   ExecutorService service;
   int counter = 0;
+  private StreamDataProducer producer;
 
   public AirlineDataStream(Schema pinotSchema, File avroFile)
-      throws FileNotFoundException, IOException {
+      throws Exception {
     this.pinotSchema = pinotSchema;
     this.avroFile = avroFile;
     createStream();
     Properties properties = new Properties();
-    properties.put("metadata.broker.list", KafkaStarterUtils.DEFAULT_KAFKA_BROKER);
+    properties.put("metadata.broker.list", DEFAULT_KAFKA_BROKER);
     properties.put("serializer.class", "kafka.serializer.DefaultEncoder");
     properties.put("request.required.acks", "1");
 
-    ProducerConfig producerConfig = new ProducerConfig(properties);
-    producer = new Producer<String, byte[]>(producerConfig);
+    producer = StreamDataProvider.getStreamDataProducer("org.apache.pinot.core.realtime.impl.kafka.server.KafkaDataProducer", properties);
+
     service = Executors.newFixedThreadPool(1);
     Quickstart.printStatus(Quickstart.Color.YELLOW,
         "***** Offine data has max time as 16101, realtime will start consuming from time 16102 and increment time every 3000 events *****");
@@ -97,9 +96,7 @@ public class AirlineDataStream {
       avroDataStream = null;
       return;
     }
-    KeyedMessage<String, byte[]> data =
-        new KeyedMessage<String, byte[]>("airlineStatsEvents", message.toString().getBytes("UTF-8"));
-    producer.send(data);
+    producer.produce("airlineStatsEvents", message.toString().getBytes("UTF-8"));
   }
 
   public void run() {
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java
index 114072f..c4e1539 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java
@@ -23,40 +23,40 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
 import java.io.File;
 import java.io.IOException;
 import java.net.URI;
-import java.net.URISyntaxException;
 import java.util.Properties;
 import javax.websocket.ClientEndpointConfig;
 import javax.websocket.Endpoint;
 import javax.websocket.EndpointConfig;
 import javax.websocket.MessageHandler;
 import javax.websocket.Session;
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
 import org.apache.pinot.common.data.Schema;
 import org.apache.pinot.common.utils.JsonUtils;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaJSONMessageDecoder;
-import org.apache.pinot.core.realtime.impl.kafka.KafkaStarterUtils;
+import org.apache.pinot.core.realtime.stream.StreamMessageDecoder;
+import org.apache.pinot.core.realtime.stream.StreamDataProducer;
+import org.apache.pinot.core.realtime.stream.StreamDataProvider;
 import org.glassfish.tyrus.client.ClientManager;
 
 
 public class MeetupRsvpStream {
+
+  private static final String DEFAULT_KAFKA_BROKER = "localhost:19092";
+
   private Schema schema;
-  private Producer<String, byte[]> producer;
+  private StreamDataProducer producer;
   private boolean keepPublishing = true;
   private ClientManager client;
 
   public MeetupRsvpStream(File schemaFile)
-      throws IOException, URISyntaxException {
+      throws Exception {
     schema = Schema.fromFile(schemaFile);
 
     Properties properties = new Properties();
-    properties.put("metadata.broker.list", KafkaStarterUtils.DEFAULT_KAFKA_BROKER);
+    properties.put("metadata.broker.list", DEFAULT_KAFKA_BROKER);
     properties.put("serializer.class", "kafka.serializer.DefaultEncoder");
     properties.put("request.required.acks", "1");
 
-    ProducerConfig producerConfig = new ProducerConfig(properties);
-    producer = new Producer<String, byte[]>(producerConfig);
+    producer = StreamDataProvider
+        .getStreamDataProducer("org.apache.pinot.core.realtime.impl.kafka.server.KafkaDataProducer", properties);
   }
 
   public void stopPublishing() {
@@ -68,7 +68,8 @@ public class MeetupRsvpStream {
     try {
 
       final ClientEndpointConfig cec = ClientEndpointConfig.Builder.create().build();
-      final KafkaJSONMessageDecoder decoder = new KafkaJSONMessageDecoder();
+      final StreamMessageDecoder decoder =
+          (StreamMessageDecoder) Class.forName("org.apache.pinot.core.realtime.impl.kafka.KafkaJSONMessageDecoder").newInstance();
       decoder.init(null, schema, null);
       client = ClientManager.createClient();
       client.connectToServer(new Endpoint() {
@@ -108,9 +109,7 @@ public class MeetupRsvpStream {
                   extracted.put("rsvp_count", 1);
 
                   if (keepPublishing) {
-                    KeyedMessage<String, byte[]> data =
-                        new KeyedMessage<String, byte[]>("meetupRSVPEvents", extracted.toString().getBytes("UTF-8"));
-                    producer.send(data);
+                    producer.produce("meetupRSVPEvents", extracted.toString().getBytes("UTF-8"));
                   }
                 } catch (Exception e) {
                   //LOGGER.error("error processing raw event ", e);
diff --git a/pom.xml b/pom.xml
index a734f94..0d00729 100644
--- a/pom.xml
+++ b/pom.xml
@@ -141,8 +141,7 @@
     kafka dependency is still explicitly defined in pinot-integration-tests, pinot-tools and pinot-perf pom files.
     To change kafka connector dependency, we only need to update this version number config.
     TODO: figure out a way to inject kafka dependency instead of explicitly setting the kafka module dependency -->
-    <kafka.version>0.9</kafka.version>
-    <kafka.scala.version>2.10</kafka.scala.version>
+    <kafka.lib.version>0.9</kafka.lib.version>
   </properties>
 
   <profiles>
@@ -929,12 +928,6 @@
         <artifactId>jopt-simple</artifactId>
         <version>4.6</version>
       </dependency>
-      <!-- kafka_2.10 & larray use scala-library -->
-      <dependency>
-        <groupId>org.scala-lang</groupId>
-        <artifactId>scala-library</artifactId>
-        <version>2.10.5</version>
-      </dependency>
       <dependency>
         <groupId>commons-lang</groupId>
         <artifactId>commons-lang</artifactId>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org