You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2019/07/03 10:13:07 UTC
[incubator-pinot] 02/02: Adding support for Kafka 2.0
This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch kafka_2.0
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit d9e031618c4c7fa28e64d62858aa7e4a36d6f279
Author: Xiang Fu <xi...@traceable.ai>
AuthorDate: Mon Jul 1 16:17:25 2019 -0700
Adding support for Kafka 2.0
---
pinot-connectors/pinot-connector-kafka-0.9/pom.xml | 5 +
pinot-connectors/pinot-connector-kafka-2.0/pom.xml | 112 +++++---
...umerFactory.java => Kafka2ConsumerFactory.java} | 36 +--
.../impl/kafka2/Kafka2ConsumerManager.java | 191 ++++++++++++++
.../impl/kafka2/Kafka2HighLevelStreamConfig.java | 135 ++++++++++
.../realtime/impl/kafka2/Kafka2MessageBatch.java | 61 +++++
.../Kafka2PartitionLevelConnectionHandler.java | 67 +++++
...Kafka2PartitionLevelPartitionLevelConsumer.java | 65 +++++
.../kafka2/Kafka2PartitionLevelStreamConfig.java | 146 +++++++++++
...Kafka2PartitionLevelStreamMetadataProvider.java | 67 +++++
...ties.java => Kafka2StreamConfigProperties.java} | 32 +--
.../impl/kafka2/Kafka2StreamLevelConsumer.java | 166 ++++++++++++
.../impl/kafka2/KafkaAvroMessageDecoder.java | 290 +++++++++++++++++++++
.../impl/kafka2/KafkaConnectionHandler.java | 61 -----
.../impl/kafka2/KafkaJSONMessageDecoder.java | 63 +++++
.../realtime/impl/kafka2/KafkaMessageBatch.java | 65 -----
.../impl/kafka2/KafkaPartitionConsumer.java | 51 ----
.../kafka2/KafkaPartitionLevelStreamConfig.java | 144 ----------
.../impl/kafka2/KafkaStreamMetadataProvider.java | 81 ------
.../realtime/impl/kafka2/MessageAndOffset.java | 42 +--
.../kafka2/KafkaPartitionLevelConsumerTest.java | 232 +++++++++++++++++
.../KafkaPartitionLevelStreamConfigTest.java | 161 ++++++++++++
.../impl/kafka2/utils/EmbeddedZooKeeper.java | 60 +++++
.../impl/kafka2/utils/MiniKafkaCluster.java | 175 +++++++++++++
pinot-connectors/pom.xml | 12 +
25 files changed, 2024 insertions(+), 496 deletions(-)
diff --git a/pinot-connectors/pinot-connector-kafka-0.9/pom.xml b/pinot-connectors/pinot-connector-kafka-0.9/pom.xml
index ae0317e..852c29c 100644
--- a/pinot-connectors/pinot-connector-kafka-0.9/pom.xml
+++ b/pinot-connectors/pinot-connector-kafka-0.9/pom.xml
@@ -63,5 +63,10 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ <version>2.10.5</version>
+ </dependency>
</dependencies>
</project>
diff --git a/pinot-connectors/pinot-connector-kafka-2.0/pom.xml b/pinot-connectors/pinot-connector-kafka-2.0/pom.xml
index f351219..2a9c155 100644
--- a/pinot-connectors/pinot-connector-kafka-2.0/pom.xml
+++ b/pinot-connectors/pinot-connector-kafka-2.0/pom.xml
@@ -22,46 +22,82 @@
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <artifactId>pinot-connectors</artifactId>
- <groupId>org.apache.pinot</groupId>
- <version>0.2.0-SNAPSHOT</version>
- <relativePath>..</relativePath>
- </parent>
- <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <artifactId>pinot-connectors</artifactId>
+ <groupId>org.apache.pinot</groupId>
+ <version>0.2.0-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>pinot-connector-kafka-2.0</artifactId>
+ <name>Pinot Connector Kafka 2.0</name>
+ <url>https://pinot.apache.org/</url>
+ <properties>
+ <pinot.root>${basedir}/../..</pinot.root>
+ <kafka.version>2.0.0</kafka.version>
+ </properties>
- <artifactId>pinot-connector-kafka-2.0</artifactId>
+ <dependencies>
- <properties>
- <pinot.root>${basedir}/../..</pinot.root>
- <kafka.version>2.0.0</kafka.version>
- </properties>
+ <!-- Kafka -->
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>${kafka.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>net.sf.jopt-simple</groupId>
+ <artifactId>jopt-simple</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
- <dependencies>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_2.12</artifactId>
+ <version>${kafka.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>net.sf.jopt-simple</groupId>
+ <artifactId>jopt-simple</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-reflect</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ </exclusion>
+ </exclusions>
+ <scope>test</scope>
+ </dependency>
- <!-- Kafka -->
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- <version>${kafka.version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- <exclusion>
- <groupId>net.sf.jopt-simple</groupId>
- <artifactId>jopt-simple</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-library</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- </dependencies>
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ <version>2.12.8</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
</project>
\ No newline at end of file
diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaConsumerFactory.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2ConsumerFactory.java
similarity index 57%
rename from pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaConsumerFactory.java
rename to pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2ConsumerFactory.java
index cc3d8a6..3eab517 100644
--- a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaConsumerFactory.java
+++ b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2ConsumerFactory.java
@@ -26,24 +26,26 @@ import org.apache.pinot.core.realtime.stream.StreamConsumerFactory;
import org.apache.pinot.core.realtime.stream.StreamLevelConsumer;
import org.apache.pinot.core.realtime.stream.StreamMetadataProvider;
-public class KafkaConsumerFactory extends StreamConsumerFactory {
- @Override
- public PartitionLevelConsumer createPartitionLevelConsumer(String clientId, int partition) {
- return new KafkaPartitionConsumer(_streamConfig, partition);
- }
- @Override
- public StreamLevelConsumer createStreamLevelConsumer(String clientId, String tableName, Schema schema, InstanceZKMetadata instanceZKMetadata, ServerMetrics serverMetrics) {
- throw new UnsupportedOperationException("High level consumer not supported in kafka 2. Use Kafka partition level consumers");
- }
+public class Kafka2ConsumerFactory extends StreamConsumerFactory {
+ @Override
+ public PartitionLevelConsumer createPartitionLevelConsumer(String clientId, int partition) {
+ return new Kafka2PartitionLevelPartitionLevelConsumer(clientId, _streamConfig, partition);
+ }
- @Override
- public StreamMetadataProvider createPartitionMetadataProvider(String clientId, int partition) {
- return null;
- }
+ @Override
+ public StreamLevelConsumer createStreamLevelConsumer(String clientId, String tableName, Schema schema,
+ InstanceZKMetadata instanceZKMetadata, ServerMetrics serverMetrics) {
+ return new Kafka2StreamLevelConsumer(clientId, tableName, _streamConfig, schema, instanceZKMetadata, serverMetrics);
+ }
- @Override
- public StreamMetadataProvider createStreamMetadataProvider(String clientId) {
- throw new UnsupportedOperationException("High level consumer not supported in kafka 2. Use Kafka partition level consumers");
- }
+ @Override
+ public StreamMetadataProvider createPartitionMetadataProvider(String clientId, int partition) {
+ return new Kafka2PartitionLevelStreamMetadataProvider(clientId, _streamConfig, partition);
+ }
+
+ @Override
+ public StreamMetadataProvider createStreamMetadataProvider(String clientId) {
+ return new Kafka2PartitionLevelStreamMetadataProvider(clientId, _streamConfig);
+ }
}
diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2ConsumerManager.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2ConsumerManager.java
new file mode 100644
index 0000000..74e3ee2
--- /dev/null
+++ b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2ConsumerManager.java
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.kafka2;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.IdentityHashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.tuple.ImmutableTriple;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.BytesDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Manager for Kafka consumers that reuses consumers and delays their shutdown.
+ *
+ * This is a workaround for the current realtime design flaw where any issue while flushing/committing offsets causes
+ * duplicate or dropped events. Kafka consumption is driven by the controller, which assigns a realtime segment to the
+ * servers; when a server is assigned a new realtime segment, it creates a Kafka consumer, consumes until it reaches a
+ * threshold then flushes to disk, writes metadata to helix indicating the segment is completed, commits Kafka offsets
+ * to ZK and then shuts down the consumer. The controller notices the metadata write and reassigns a segment to the
+ * server, so that it can keep on consuming.
+ *
+ * This logic is flawed if committing Kafka offsets fails, at which time the committed state is unknown. The proper fix
+ * would be to just keep on using that consumer and try committing our offsets later, but we recreate a new Kafka
+ * consumer whenever we get a new segment and also keep the old consumer around, leading to half the events being
+ * assigned, due to Kafka rebalancing the partitions between the two consumers (one of which is not actually reading
+ * anything anymore). Because that logic is stateless and driven by Helix, there's no real clean way to keep the
+ * consumer alive and pass it to the next segment.
+ *
+ * This class and long comment is to work around this issue by keeping the consumer alive for a little bit instead of
+ * shutting it down immediately, so that the next segment assignment can pick up the same consumer. This way, even if
+ * committing the offsets fails, we can still pick up the same consumer the next time we get a segment assigned to us
+ * by the controller and hopefully commit our offsets the next time we flush to disk.
+ *
+ * This temporary code should be completely removed by the time we redesign the consumption to use the lower level
+ * Kafka APIs.
+ */
+public class Kafka2ConsumerManager {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(Kafka2ConsumerManager.class);
+ private static final Long IN_USE = -1L;
+ private static final long CONSUMER_SHUTDOWN_DELAY_MILLIS = TimeUnit.SECONDS.toMillis(60); // One minute
+ private static final Map<ImmutableTriple<String, String, String>, KafkaConsumer> CONSUMER_FOR_CONFIG_KEY =
+ new HashMap<>();
+ private static final IdentityHashMap<KafkaConsumer, Long> CONSUMER_RELEASE_TIME = new IdentityHashMap<>();
+
+ public static KafkaConsumer acquireKafkaConsumerForConfig(Kafka2HighLevelStreamConfig kafka2HighLevelStreamConfig) {
+ final ImmutableTriple<String, String, String> configKey =
+ new ImmutableTriple<>(kafka2HighLevelStreamConfig.getKafkaTopicName(), kafka2HighLevelStreamConfig.getGroupId(),
+ kafka2HighLevelStreamConfig.getBootstrapServers());
+
+ synchronized (Kafka2ConsumerManager.class) {
+ // If we have the consumer and it's not already acquired, return it, otherwise error out if it's already acquired
+ if (CONSUMER_FOR_CONFIG_KEY.containsKey(configKey)) {
+ KafkaConsumer kafkaConsumer = CONSUMER_FOR_CONFIG_KEY.get(configKey);
+ if (CONSUMER_RELEASE_TIME.get(kafkaConsumer).equals(IN_USE)) {
+ throw new RuntimeException("Consumer/iterator " + kafkaConsumer + " already in use!");
+ } else {
+ LOGGER.info("Reusing kafka consumer/iterator with id {}", kafkaConsumer);
+ CONSUMER_RELEASE_TIME.put(kafkaConsumer, IN_USE);
+ return kafkaConsumer;
+ }
+ }
+
+ LOGGER.info("Creating new kafka consumer and iterator for topic {}",
+ kafka2HighLevelStreamConfig.getKafkaTopicName());
+
+ // Create the consumer
+
+ Properties consumerProp = new Properties();
+ consumerProp.putAll(kafka2HighLevelStreamConfig.getKafkaConsumerProperties());
+ consumerProp.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka2HighLevelStreamConfig.getBootstrapServers());
+ consumerProp.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ consumerProp.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class.getName());
+ KafkaConsumer consumer = new KafkaConsumer<>(consumerProp);
+ consumer.subscribe(Collections.singletonList(kafka2HighLevelStreamConfig.getKafkaTopicName()));
+
+ // Mark both the consumer and iterator as acquired
+ CONSUMER_FOR_CONFIG_KEY.put(configKey, consumer);
+ CONSUMER_RELEASE_TIME.put(consumer, IN_USE);
+
+ LOGGER.info("Created consumer/iterator with id {} for topic {}", consumer,
+ kafka2HighLevelStreamConfig.getKafkaTopicName());
+
+ return consumer;
+ }
+ }
+
+ public static void releaseKafkaConsumer(final KafkaConsumer kafkaConsumer) {
+ synchronized (Kafka2ConsumerManager.class) {
+ // Release the consumer, mark it for shutdown in the future
+ final long releaseTime = System.currentTimeMillis() + CONSUMER_SHUTDOWN_DELAY_MILLIS;
+ CONSUMER_RELEASE_TIME.put(kafkaConsumer, releaseTime);
+
+ LOGGER.info("Marking consumer/iterator with id {} for release at {}", kafkaConsumer, releaseTime);
+
+ // Schedule the shutdown of the consumer
+ new Thread() {
+ @Override
+ public void run() {
+ try {
+ // Await the shutdown time
+ Uninterruptibles.sleepUninterruptibly(CONSUMER_SHUTDOWN_DELAY_MILLIS, TimeUnit.MILLISECONDS);
+
+ // Shutdown all consumers that have not been re-acquired
+ synchronized (Kafka2ConsumerManager.class) {
+ LOGGER.info("Executing release check for consumer/iterator {} at {}, scheduled at ", kafkaConsumer,
+ System.currentTimeMillis(), releaseTime);
+
+ Iterator<Map.Entry<ImmutableTriple<String, String, String>, KafkaConsumer>> configIterator =
+ CONSUMER_FOR_CONFIG_KEY.entrySet().iterator();
+
+ while (configIterator.hasNext()) {
+ Map.Entry<ImmutableTriple<String, String, String>, KafkaConsumer> entry = configIterator.next();
+ KafkaConsumer kafkaConsumer = entry.getValue();
+
+ final Long releaseTime = CONSUMER_RELEASE_TIME.get(kafkaConsumer);
+ if (!releaseTime.equals(IN_USE) && releaseTime < System.currentTimeMillis()) {
+ LOGGER.info("Releasing consumer/iterator {}", kafkaConsumer);
+
+ try {
+ kafkaConsumer.close();
+ } catch (Exception e) {
+ LOGGER.warn("Caught exception while shutting down Kafka consumer with id {}", kafkaConsumer, e);
+ }
+
+ configIterator.remove();
+ CONSUMER_RELEASE_TIME.remove(kafkaConsumer);
+ } else {
+ LOGGER.info("Not releasing consumer/iterator {}, it has been reacquired", kafkaConsumer);
+ }
+ }
+ }
+ } catch (Exception e) {
+ LOGGER.warn("Caught exception in release of consumer/iterator {}", e, kafkaConsumer);
+ }
+ }
+ }.start();
+ }
+ }
+
+ public static void closeAllConsumers() {
+ try {
+ // Shutdown all consumers
+ synchronized (Kafka2ConsumerManager.class) {
+ LOGGER.info("Trying to shutdown all the kafka consumers");
+ Iterator<KafkaConsumer> consumerIterator = CONSUMER_FOR_CONFIG_KEY.values().iterator();
+
+ while (consumerIterator.hasNext()) {
+ KafkaConsumer kafkaConsumer = consumerIterator.next();
+ LOGGER.info("Trying to shutdown consumer/iterator {}", kafkaConsumer);
+ try {
+ kafkaConsumer.close();
+ } catch (Exception e) {
+ LOGGER.warn("Caught exception while shutting down Kafka consumer with id {}", kafkaConsumer, e);
+ }
+ consumerIterator.remove();
+ }
+ CONSUMER_FOR_CONFIG_KEY.clear();
+ CONSUMER_RELEASE_TIME.clear();
+ }
+ } catch (Exception e) {
+ LOGGER.warn("Caught exception during shutting down all kafka consumers", e);
+ }
+ }
+}
diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2HighLevelStreamConfig.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2HighLevelStreamConfig.java
new file mode 100644
index 0000000..f866288
--- /dev/null
+++ b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2HighLevelStreamConfig.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.kafka2;
+
+import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
+import org.apache.pinot.common.utils.EqualityUtils;
+import org.apache.pinot.core.realtime.stream.StreamConfig;
+import org.apache.pinot.core.realtime.stream.StreamConfigProperties;
+
+
+/**
+ * Wrapper around {@link StreamConfig} for use in the {@link Kafka2StreamLevelConsumer}
+ */
+public class Kafka2HighLevelStreamConfig {
+ private static final String DEFAULT_AUTO_COMMIT_ENABLE = "false";
+
+ private static final Map<String, String> defaultProps;
+ private String _kafkaTopicName;
+ private String _groupId;
+ private String _bootstrapServers;
+ private Map<String, String> _kafkaConsumerProperties;
+
+ /**
+ * Builds a wrapper around {@link StreamConfig} to fetch kafka stream level consumer specific configs
+ * @param streamConfig
+ * @param tableName
+ * @param instanceZKMetadata
+ */
+ public Kafka2HighLevelStreamConfig(StreamConfig streamConfig, String tableName,
+ InstanceZKMetadata instanceZKMetadata) {
+ Map<String, String> streamConfigMap = streamConfig.getStreamConfigsMap();
+
+ _kafkaTopicName = streamConfig.getTopicName();
+ String hlcBootstrapBrokerUrlKey = Kafka2StreamConfigProperties
+ .constructStreamProperty(Kafka2StreamConfigProperties.HighLevelConsumer.KAFKA_HLC_BOOTSTRAP_SERVER);
+ _bootstrapServers = streamConfigMap.get(hlcBootstrapBrokerUrlKey);
+ Preconditions.checkNotNull(_bootstrapServers,
+ "Must specify bootstrap broker connect string " + hlcBootstrapBrokerUrlKey + " in high level kafka consumer");
+ _groupId = instanceZKMetadata.getGroupId(tableName);
+
+ _kafkaConsumerProperties = new HashMap<>();
+ String kafkaConsumerPropertyPrefix =
+ Kafka2StreamConfigProperties.constructStreamProperty(Kafka2StreamConfigProperties.KAFKA_CONSUMER_PROP_PREFIX);
+ for (String key : streamConfigMap.keySet()) {
+ if (key.startsWith(kafkaConsumerPropertyPrefix)) {
+ _kafkaConsumerProperties
+ .put(StreamConfigProperties.getPropertySuffix(key, kafkaConsumerPropertyPrefix), streamConfigMap.get(key));
+ }
+ }
+ }
+
+ public String getKafkaTopicName() {
+ return _kafkaTopicName;
+ }
+
+ public String getGroupId() {
+ return _groupId;
+ }
+
+ public Properties getKafkaConsumerProperties() {
+ Properties props = new Properties();
+ for (String key : defaultProps.keySet()) {
+ props.put(key, defaultProps.get(key));
+ }
+ for (String key : _kafkaConsumerProperties.keySet()) {
+ props.put(key, _kafkaConsumerProperties.get(key));
+ }
+ props.put("group.id", _groupId);
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, _bootstrapServers);
+ return props;
+ }
+
+ @Override
+ public String toString() {
+ return "Kafka2HighLevelStreamConfig{" + "_kafkaTopicName='" + _kafkaTopicName + '\'' + ", _groupId='" + _groupId
+ + '\'' + ", _bootstrapServers='" + _bootstrapServers + '\'' + ", _kafkaConsumerProperties="
+ + _kafkaConsumerProperties + '}';
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (EqualityUtils.isSameReference(this, o)) {
+ return true;
+ }
+
+ if (EqualityUtils.isNullOrNotSameClass(this, o)) {
+ return false;
+ }
+
+ Kafka2HighLevelStreamConfig that = (Kafka2HighLevelStreamConfig) o;
+
+ return EqualityUtils.isEqual(_kafkaTopicName, that._kafkaTopicName) && EqualityUtils
+ .isEqual(_groupId, that._groupId) && EqualityUtils.isEqual(_bootstrapServers, that._bootstrapServers)
+ && EqualityUtils.isEqual(_kafkaConsumerProperties, that._kafkaConsumerProperties);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = EqualityUtils.hashCodeOf(_kafkaTopicName);
+ result = EqualityUtils.hashCodeOf(result, _groupId);
+ result = EqualityUtils.hashCodeOf(result, _bootstrapServers);
+ result = EqualityUtils.hashCodeOf(result, _kafkaConsumerProperties);
+ return result;
+ }
+
+ public String getBootstrapServers() {
+ return _bootstrapServers;
+ }
+
+ static {
+ defaultProps = new HashMap<>();
+ defaultProps.put(Kafka2StreamConfigProperties.HighLevelConsumer.AUTO_COMMIT_ENABLE, DEFAULT_AUTO_COMMIT_ENABLE);
+ }
+}
diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2MessageBatch.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2MessageBatch.java
new file mode 100644
index 0000000..13bd41b
--- /dev/null
+++ b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2MessageBatch.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.kafka2;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.pinot.core.realtime.stream.MessageBatch;
+
+
+public class Kafka2MessageBatch implements MessageBatch<byte[]> {
+
+ private List<MessageAndOffset> messageList = new ArrayList<>();
+
+ public Kafka2MessageBatch(Iterable<ConsumerRecord<String, byte[]>> iterable) {
+ for (ConsumerRecord<String, byte[]> record : iterable) {
+ messageList.add(new MessageAndOffset(record.value(), record.offset()));
+ }
+ }
+
+ @Override
+ public int getMessageCount() {
+ return messageList.size();
+ }
+
+ @Override
+ public byte[] getMessageAtIndex(int index) {
+ return messageList.get(index).getMessage().array();
+ }
+
+ @Override
+ public int getMessageOffsetAtIndex(int index) {
+ return messageList.get(index).getMessage().arrayOffset();
+ }
+
+ @Override
+ public int getMessageLengthAtIndex(int index) {
+ return messageList.get(index).payloadSize();
+ }
+
+ @Override
+ public long getNextStreamMessageOffsetAtIndex(int index) {
+ return messageList.get(index).getNextOffset();
+ }
+}
diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2PartitionLevelConnectionHandler.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2PartitionLevelConnectionHandler.java
new file mode 100644
index 0000000..3f2550d
--- /dev/null
+++ b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2PartitionLevelConnectionHandler.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.kafka2;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Properties;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.BytesDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.pinot.core.realtime.stream.StreamConfig;
+
+
+public abstract class Kafka2PartitionLevelConnectionHandler {
+
+ protected final Kafka2PartitionLevelStreamConfig _config;
+ protected final String _clientId;
+ protected final int _partition;
+ protected final String _topic;
+ protected final Consumer<String, byte[]> _consumer;
+ protected final TopicPartition _topicPartition;
+
+ public Kafka2PartitionLevelConnectionHandler(String clientId, StreamConfig streamConfig, int partition) {
+ _config = new Kafka2PartitionLevelStreamConfig(streamConfig);
+ _clientId = clientId;
+ _partition = partition;
+ _topic = _config.getKafkaTopicName();
+ Properties consumerProp = new Properties();
+ consumerProp.putAll(streamConfig.getStreamConfigsMap());
+ consumerProp.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, _config.getBootstrapHosts());
+ consumerProp.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ consumerProp.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class.getName());
+ _consumer = new KafkaConsumer<>(consumerProp);
+ _topicPartition = new TopicPartition(_topic, _partition);
+ _consumer.assign(Collections.singletonList(_topicPartition));
+ }
+
+ public void close()
+ throws IOException {
+ _consumer.close();
+ }
+
+ @VisibleForTesting
+ protected Kafka2PartitionLevelStreamConfig getKafka2PartitionLevelStreamConfig() {
+ return _config;
+ }
+}
diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2PartitionLevelPartitionLevelConsumer.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2PartitionLevelPartitionLevelConsumer.java
new file mode 100644
index 0000000..19f520a
--- /dev/null
+++ b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2PartitionLevelPartitionLevelConsumer.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.kafka2;
+
+import com.google.common.collect.Iterables;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.pinot.core.realtime.stream.MessageBatch;
+import org.apache.pinot.core.realtime.stream.PartitionLevelConsumer;
+import org.apache.pinot.core.realtime.stream.StreamConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class Kafka2PartitionLevelPartitionLevelConsumer extends Kafka2PartitionLevelConnectionHandler implements PartitionLevelConsumer {
+ private static final Logger LOGGER = LoggerFactory.getLogger(Kafka2PartitionLevelPartitionLevelConsumer.class);
+
+ public Kafka2PartitionLevelPartitionLevelConsumer(String clientId, StreamConfig streamConfig, int partition) {
+ super(clientId, streamConfig, partition);
+ }
+
+ @Override
+ public MessageBatch fetchMessages(long startOffset, long endOffset, int timeoutMillis)
+ throws TimeoutException {
+ _consumer.seek(_topicPartition, startOffset);
+ ConsumerRecords<String, byte[]> consumerRecords = _consumer.poll(Duration.ofMillis(timeoutMillis));
+ final Iterable<ConsumerRecord<String, byte[]>> messageAndOffsetIterable =
+ buildOffsetFilteringIterable(consumerRecords.records(_topicPartition), startOffset, endOffset);
+ return new Kafka2MessageBatch(messageAndOffsetIterable);
+ }
+
+ private Iterable<ConsumerRecord<String, byte[]>> buildOffsetFilteringIterable(
+ final List<ConsumerRecord<String, byte[]>> messageAndOffsets, final long startOffset, final long endOffset) {
+ return Iterables.filter(messageAndOffsets, input -> {
+ // Filter messages that are either null or have an offset ∉ [startOffset, endOffset]
+ return input != null && input.offset() >= startOffset && (endOffset > input.offset() || endOffset == -1);
+ });
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+ super.close();
+ }
+}
diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2PartitionLevelStreamConfig.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2PartitionLevelStreamConfig.java
new file mode 100644
index 0000000..fcc0e04
--- /dev/null
+++ b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2PartitionLevelStreamConfig.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.kafka2;
+
+import com.google.common.base.Preconditions;
+import java.util.Map;
+import org.apache.commons.lang.StringUtils;
+import org.apache.pinot.common.utils.EqualityUtils;
+import org.apache.pinot.core.realtime.stream.StreamConfig;
+
+
+/**
+ * Wrapper around {@link StreamConfig} for use in {@link Kafka2PartitionLevelPartitionLevelConsumer}
+ */
+public class Kafka2PartitionLevelStreamConfig {
+
+ private final String _kafkaTopicName;
+ private final String _bootstrapHosts;
+ private final int _kafkaBufferSize;
+ private final int _kafkaSocketTimeout;
+ private final int _kafkaFetcherSizeBytes;
+ private final int _kafkaFetcherMinBytes;
+ private final Map<String, String> _streamConfigMap;
+
+ /**
+ * Builds a wrapper around {@link StreamConfig} to fetch kafka partition level consumer related configs
+ * @param streamConfig
+ */
+ public Kafka2PartitionLevelStreamConfig(StreamConfig streamConfig) {
+ _streamConfigMap = streamConfig.getStreamConfigsMap();
+
+ _kafkaTopicName = streamConfig.getTopicName();
+
+ String llcBrokerListKey = Kafka2StreamConfigProperties
+ .constructStreamProperty(Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_BROKER_LIST);
+ String llcBufferKey = Kafka2StreamConfigProperties
+ .constructStreamProperty(Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE);
+ String llcTimeoutKey = Kafka2StreamConfigProperties
+ .constructStreamProperty(Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_SOCKET_TIMEOUT);
+ String fetcherSizeKey = Kafka2StreamConfigProperties
+ .constructStreamProperty(Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_FETCHER_SIZE_BYTES);
+ String fetcherMinBytesKey = Kafka2StreamConfigProperties
+ .constructStreamProperty(Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_FETCHER_MIN_BYTES);
+ _bootstrapHosts = _streamConfigMap.get(llcBrokerListKey);
+ _kafkaBufferSize = getIntConfigWithDefault(_streamConfigMap, llcBufferKey,
+ Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE_DEFAULT);
+ _kafkaSocketTimeout = getIntConfigWithDefault(_streamConfigMap, llcTimeoutKey,
+ Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_SOCKET_TIMEOUT_DEFAULT);
+ _kafkaFetcherSizeBytes = getIntConfigWithDefault(_streamConfigMap, fetcherSizeKey, _kafkaBufferSize);
+ _kafkaFetcherMinBytes = getIntConfigWithDefault(_streamConfigMap, fetcherMinBytesKey,
+ Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_FETCHER_MIN_BYTES_DEFAULT);
+ Preconditions.checkNotNull(_bootstrapHosts,
+ "Must specify kafka brokers list " + llcBrokerListKey + " in case of low level kafka consumer");
+ }
+
+ public String getKafkaTopicName() {
+ return _kafkaTopicName;
+ }
+
+ public String getBootstrapHosts() {
+ return _bootstrapHosts;
+ }
+
+ public int getKafkaBufferSize() {
+ return _kafkaBufferSize;
+ }
+
+ public int getKafkaSocketTimeout() {
+ return _kafkaSocketTimeout;
+ }
+
+ public int getKafkaFetcherSizeBytes() {
+ return _kafkaFetcherSizeBytes;
+ }
+
+ public int getKafkaFetcherMinBytes() {
+ return _kafkaFetcherMinBytes;
+ }
+
+ private int getIntConfigWithDefault(Map<String, String> configMap, String key, int defaultValue) {
+ String stringValue = configMap.get(key);
+ try {
+ if (StringUtils.isNotEmpty(stringValue)) {
+ return Integer.parseInt(stringValue);
+ }
+ return defaultValue;
+ } catch (NumberFormatException ex) {
+ return defaultValue;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "KafkaLowLevelStreamConfig{" + "_kafkaTopicName='" + _kafkaTopicName + '\'' + ", _bootstrapHosts='"
+ + _bootstrapHosts + '\'' + ", _kafkaBufferSize='" + _kafkaBufferSize + '\'' + ", _kafkaSocketTimeout='"
+ + _kafkaSocketTimeout + '\'' + ", _kafkaFetcherSizeBytes='" + _kafkaFetcherSizeBytes + '\''
+ + ", _kafkaFetcherMinBytes='" + _kafkaFetcherMinBytes + '\'' + '}';
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (EqualityUtils.isSameReference(this, o)) {
+ return true;
+ }
+
+ if (EqualityUtils.isNullOrNotSameClass(this, o)) {
+ return false;
+ }
+
+ Kafka2PartitionLevelStreamConfig that = (Kafka2PartitionLevelStreamConfig) o;
+
+ return EqualityUtils.isEqual(_kafkaTopicName, that._kafkaTopicName) && EqualityUtils
+ .isEqual(_bootstrapHosts, that._bootstrapHosts) && EqualityUtils
+ .isEqual(_kafkaBufferSize, that._kafkaBufferSize) && EqualityUtils
+ .isEqual(_kafkaSocketTimeout, that._kafkaSocketTimeout) && EqualityUtils
+ .isEqual(_kafkaFetcherSizeBytes, that._kafkaFetcherSizeBytes) && EqualityUtils
+ .isEqual(_kafkaFetcherMinBytes, that._kafkaFetcherMinBytes);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = EqualityUtils.hashCodeOf(_kafkaTopicName);
+ result = EqualityUtils.hashCodeOf(result, _bootstrapHosts);
+ result = EqualityUtils.hashCodeOf(result, _kafkaBufferSize);
+ result = EqualityUtils.hashCodeOf(result, _kafkaSocketTimeout);
+ result = EqualityUtils.hashCodeOf(result, _kafkaFetcherSizeBytes);
+ result = EqualityUtils.hashCodeOf(result, _kafkaFetcherMinBytes);
+ return result;
+ }
+}
diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2PartitionLevelStreamMetadataProvider.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2PartitionLevelStreamMetadataProvider.java
new file mode 100644
index 0000000..7a0558d
--- /dev/null
+++ b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2PartitionLevelStreamMetadataProvider.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.kafka2;
+
+import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.concurrent.TimeoutException;
+import javax.annotation.Nonnull;
+import org.apache.pinot.core.realtime.stream.OffsetCriteria;
+import org.apache.pinot.core.realtime.stream.StreamConfig;
+import org.apache.pinot.core.realtime.stream.StreamMetadataProvider;
+
+
+public class Kafka2PartitionLevelStreamMetadataProvider extends Kafka2PartitionLevelConnectionHandler implements StreamMetadataProvider {
+
+ public Kafka2PartitionLevelStreamMetadataProvider(String clientId, StreamConfig streamConfig) {
+ this(clientId, streamConfig, Integer.MIN_VALUE);
+ }
+
+ public Kafka2PartitionLevelStreamMetadataProvider(String clientId, StreamConfig streamConfig, int partition) {
+ super(clientId, streamConfig, partition);
+ }
+
+ @Override
+ public int fetchPartitionCount(long timeoutMillis) {
+ return _consumer.partitionsFor(_topic, Duration.ofMillis(timeoutMillis)).size();
+ }
+
+ @Override
+ public long fetchPartitionOffset(@Nonnull OffsetCriteria offsetCriteria, long timeoutMillis)
+ throws TimeoutException {
+ Preconditions.checkNotNull(offsetCriteria);
+ if (offsetCriteria.isLargest()) {
+ return _consumer.endOffsets(Collections.singletonList(_topicPartition), Duration.ofMillis(timeoutMillis))
+ .get(_topicPartition);
+ } else if (offsetCriteria.isSmallest()) {
+ return _consumer.beginningOffsets(Collections.singletonList(_topicPartition), Duration.ofMillis(timeoutMillis))
+ .get(_topicPartition);
+ } else {
+ throw new IllegalArgumentException("Unknown initial offset value " + offsetCriteria.toString());
+ }
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+ super.close();
+ }
+}
diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaStreamConfigProperties.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2StreamConfigProperties.java
similarity index 76%
rename from pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaStreamConfigProperties.java
rename to pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2StreamConfigProperties.java
index 3c45d6e..ed27dfc 100644
--- a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaStreamConfigProperties.java
+++ b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2StreamConfigProperties.java
@@ -25,19 +25,22 @@ import org.apache.pinot.core.realtime.stream.StreamConfigProperties;
/**
* Property key definitions for all kafka stream related properties
*/
-public class KafkaStreamConfigProperties {
+public class Kafka2StreamConfigProperties {
public static final String DOT_SEPARATOR = ".";
- public static final String STREAM_TYPE = "kafka";
+ public static final String KAFKA_CONSUMER_PROP_PREFIX = "kafka.consumer.prop";
+
+ /**
+ * Helper method to create a property string for kafka stream
+ * @param property
+ * @return
+ */
+ public static String constructStreamProperty(String property) {
+ return Joiner.on(DOT_SEPARATOR).join(StreamConfigProperties.STREAM_PREFIX, property);
+ }
public static class HighLevelConsumer {
- public static final String KAFKA_HLC_ZK_CONNECTION_STRING = "kafka.hlc.zk.connect.string";
- public static final String ZK_SESSION_TIMEOUT_MS = "zookeeper.session.timeout.ms";
- public static final String ZK_CONNECTION_TIMEOUT_MS = "zookeeper.connection.timeout.ms";
- public static final String ZK_SYNC_TIME_MS = "zookeeper.sync.time.ms";
- public static final String REBALANCE_MAX_RETRIES = "rebalance.max.retries";
- public static final String REBALANCE_BACKOFF_MS = "rebalance.backoff.ms";
+ public static final String KAFKA_HLC_BOOTSTRAP_SERVER = "kafka.hlc.bootstrap.server";
public static final String AUTO_COMMIT_ENABLE = "auto.commit.enable";
- public static final String AUTO_OFFSET_RESET = "auto.offset.reset";
}
public static class LowLevelConsumer {
@@ -50,16 +53,5 @@ public class KafkaStreamConfigProperties {
public static final String KAFKA_FETCHER_MIN_BYTES = "kafka.fetcher.minBytes";
public static final int KAFKA_FETCHER_MIN_BYTES_DEFAULT = 100000;
}
-
- public static final String KAFKA_CONSUMER_PROP_PREFIX = "kafka.consumer.prop";
-
- /**
- * Helper method to create a property string for kafka stream
- * @param property
- * @return
- */
- public static String constructStreamProperty(String property) {
- return Joiner.on(DOT_SEPARATOR).join(StreamConfigProperties.STREAM_PREFIX, property);
- }
}
diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2StreamLevelConsumer.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2StreamLevelConsumer.java
new file mode 100644
index 0000000..4bbf975
--- /dev/null
+++ b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/Kafka2StreamLevelConsumer.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.kafka2;
+
+import com.yammer.metrics.core.Meter;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.pinot.common.data.Schema;
+import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
+import org.apache.pinot.common.metrics.ServerMeter;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.core.data.GenericRow;
+import org.apache.pinot.core.realtime.stream.StreamConfig;
+import org.apache.pinot.core.realtime.stream.StreamDecoderProvider;
+import org.apache.pinot.core.realtime.stream.StreamLevelConsumer;
+import org.apache.pinot.core.realtime.stream.StreamMessageDecoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * An implementation of a {@link StreamLevelConsumer} which consumes from the kafka stream
+ */
+public class Kafka2StreamLevelConsumer implements StreamLevelConsumer {
+
+ private StreamMessageDecoder _messageDecoder;
+ private Logger INSTANCE_LOGGER;
+
+ private String _clientId;
+ private String _tableAndStreamName;
+
+ private StreamConfig _streamConfig;
+ private Kafka2HighLevelStreamConfig _kafka2HighLevelStreamConfig;
+
+ private KafkaConsumer<byte[], byte[]> consumer;
+ private ConsumerRecords<byte[], byte[]> consumerRecords;
+ private Iterator<ConsumerRecord<byte[], byte[]>> kafkaIterator;
+ private Map<Integer, Long> consumerOffsets = new HashMap<>(); // tracking current consumed records offsets.
+
+ private long lastLogTime = 0;
+ private long lastCount = 0;
+ private long currentCount = 0L;
+
+ private ServerMetrics _serverMetrics;
+ private Meter tableAndStreamRowsConsumed = null;
+ private Meter tableRowsConsumed = null;
+
+ public Kafka2StreamLevelConsumer(String clientId, String tableName, StreamConfig streamConfig, Schema schema,
+ InstanceZKMetadata instanceZKMetadata, ServerMetrics serverMetrics) {
+ _clientId = clientId;
+ _streamConfig = streamConfig;
+ _kafka2HighLevelStreamConfig = new Kafka2HighLevelStreamConfig(streamConfig, tableName, instanceZKMetadata);
+ _serverMetrics = serverMetrics;
+
+ _messageDecoder = StreamDecoderProvider.create(streamConfig, schema);
+
+ _tableAndStreamName = tableName + "-" + streamConfig.getTopicName();
+ INSTANCE_LOGGER = LoggerFactory
+ .getLogger(Kafka2StreamLevelConsumer.class.getName() + "_" + tableName + "_" + streamConfig.getTopicName());
+ }
+
+ @Override
+ public void start()
+ throws Exception {
+ consumer = Kafka2ConsumerManager.acquireKafkaConsumerForConfig(_kafka2HighLevelStreamConfig);
+ }
+
+ private void updateKafkaIterator() {
+ consumerRecords = consumer.poll(Duration.ofMillis(_streamConfig.getFetchTimeoutMillis()));
+ kafkaIterator = consumerRecords.iterator();
+ }
+
+ @Override
+ public GenericRow next(GenericRow destination) {
+ if (!kafkaIterator.hasNext()) {
+ updateKafkaIterator();
+ }
+ if (kafkaIterator.hasNext()) {
+ try {
+ final ConsumerRecord<byte[], byte[]> record = kafkaIterator.next();
+ updateOffsets(record.partition(), record.offset());
+ destination = _messageDecoder.decode(record.value(), destination);
+ tableAndStreamRowsConsumed = _serverMetrics
+ .addMeteredTableValue(_tableAndStreamName, ServerMeter.REALTIME_ROWS_CONSUMED, 1L,
+ tableAndStreamRowsConsumed);
+ tableRowsConsumed =
+ _serverMetrics.addMeteredGlobalValue(ServerMeter.REALTIME_ROWS_CONSUMED, 1L, tableRowsConsumed);
+
+ ++currentCount;
+
+ final long now = System.currentTimeMillis();
+ // Log every minute or 100k events
+ if (now - lastLogTime > 60000 || currentCount - lastCount >= 100000) {
+ if (lastCount == 0) {
+ INSTANCE_LOGGER.info("Consumed {} events from kafka stream {}", currentCount, _streamConfig.getTopicName());
+ } else {
+ INSTANCE_LOGGER.info("Consumed {} events from kafka stream {} (rate:{}/s)", currentCount - lastCount,
+ _streamConfig.getTopicName(), (float) (currentCount - lastCount) * 1000 / (now - lastLogTime));
+ }
+ lastCount = currentCount;
+ lastLogTime = now;
+ }
+ return destination;
+ } catch (Exception e) {
+ INSTANCE_LOGGER.warn("Caught exception while consuming events", e);
+ _serverMetrics.addMeteredTableValue(_tableAndStreamName, ServerMeter.REALTIME_CONSUMPTION_EXCEPTIONS, 1L);
+ _serverMetrics.addMeteredGlobalValue(ServerMeter.REALTIME_CONSUMPTION_EXCEPTIONS, 1L);
+ throw e;
+ }
+ }
+ return null;
+ }
+
+ private void updateOffsets(int partition, long offset) {
+ consumerOffsets.put(partition, offset);
+ }
+
+ @Override
+ public void commit() {
+ consumer.commitSync(getOffsetsMap());
+ consumerOffsets.clear();
+ _serverMetrics.addMeteredTableValue(_tableAndStreamName, ServerMeter.REALTIME_OFFSET_COMMITS, 1L);
+ _serverMetrics.addMeteredGlobalValue(ServerMeter.REALTIME_OFFSET_COMMITS, 1L);
+ }
+
+ private Map<TopicPartition, OffsetAndMetadata> getOffsetsMap() {
+ Map<TopicPartition, OffsetAndMetadata> offsetsMap = new HashMap<>();
+ for (Integer partition : consumerOffsets.keySet()) {
+ offsetsMap.put(new TopicPartition(_streamConfig.getTopicName(), partition),
+ new OffsetAndMetadata(consumerOffsets.get(partition)));
+ }
+ return offsetsMap;
+ }
+
+ @Override
+ public void shutdown()
+ throws Exception {
+ if (consumer != null) {
+ consumer = null;
+ Kafka2ConsumerManager.releaseKafkaConsumer(consumer);
+ }
+ }
+}
diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaAvroMessageDecoder.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaAvroMessageDecoder.java
new file mode 100644
index 0000000..5e09faf
--- /dev/null
+++ b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaAvroMessageDecoder.java
@@ -0,0 +1,290 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.kafka2;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLConnection;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.commons.lang.StringUtils;
+import org.apache.pinot.common.data.Schema;
+import org.apache.pinot.common.utils.retry.RetryPolicies;
+import org.apache.pinot.core.data.GenericRow;
+import org.apache.pinot.core.realtime.stream.AvroRecordToPinotRowGenerator;
+import org.apache.pinot.core.realtime.stream.StreamMessageDecoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@NotThreadSafe
+public class KafkaAvroMessageDecoder implements StreamMessageDecoder<byte[]> {
+ private static final Logger LOGGER = LoggerFactory.getLogger(KafkaAvroMessageDecoder.class);
+
+ private static final String SCHEMA_REGISTRY_REST_URL = "schema.registry.rest.url";
+ private static final String SCHEMA_REGISTRY_SCHEMA_NAME = "schema.registry.schema.name";
+ private org.apache.avro.Schema defaultAvroSchema;
+ private MD5AvroSchemaMap md5ToAvroSchemaMap;
+
+ // A global cache for schemas across all threads.
+ private static final Map<String, org.apache.avro.Schema> globalSchemaCache = new HashMap<>();
+ // Suffix for getting the latest schema
+ private static final String LATEST = "-latest";
+
+ // Reusable byte[] to read MD5 from payload. This is OK as this class is used only by a single thread.
+ private final byte[] reusableMD5Bytes = new byte[SCHEMA_HASH_LENGTH];
+
+ private DecoderFactory decoderFactory;
+ private AvroRecordToPinotRowGenerator avroRecordConvetrer;
+
+ private static final int MAGIC_BYTE_LENGTH = 1;
+ private static final int SCHEMA_HASH_LENGTH = 16;
+ private static final int HEADER_LENGTH = MAGIC_BYTE_LENGTH + SCHEMA_HASH_LENGTH;
+
+ private static final int SCHEMA_HASH_START_OFFSET = MAGIC_BYTE_LENGTH;
+
+ private static final int MAXIMUM_SCHEMA_FETCH_RETRY_COUNT = 5;
+ private static final int MINIMUM_SCHEMA_FETCH_RETRY_TIME_MILLIS = 500;
+ private static final float SCHEMA_FETCH_RETRY_EXPONENTIAL_BACKOFF_FACTOR = 2.0f;
+
+ private String[] schemaRegistryUrls;
+
+ @Override
+ public void init(Map<String, String> props, Schema indexingSchema, String topicName)
+ throws Exception {
+ schemaRegistryUrls = parseSchemaRegistryUrls(props.get(SCHEMA_REGISTRY_REST_URL));
+
+ for (String schemaRegistryUrl : schemaRegistryUrls) {
+ StringUtils.chomp(schemaRegistryUrl, "/");
+ }
+
+ String avroSchemaName = topicName;
+ if (props.containsKey(SCHEMA_REGISTRY_SCHEMA_NAME) && props.get(SCHEMA_REGISTRY_SCHEMA_NAME) != null && !props
+ .get(SCHEMA_REGISTRY_SCHEMA_NAME).isEmpty()) {
+ avroSchemaName = props.get(SCHEMA_REGISTRY_SCHEMA_NAME);
+ }
+ // With the logic below, we may not set defaultAvroSchema to be the latest one everytime.
+ // The schema is fetched once when the machine starts. Until the next restart. the latest schema is
+ // not fetched.
+ // But then we always pay attention to the exact MD5 hash and attempt to fetch the schema for that particular hash
+ // before decoding an incoming kafka event. We use defaultAvroSchema only if the fetch for the particular MD5 fails,
+ // but then we will retry that fetch on every event in case of failure.
+ synchronized (globalSchemaCache) {
+ final String hashKey = avroSchemaName + LATEST;
+ defaultAvroSchema = globalSchemaCache.get(hashKey);
+ if (defaultAvroSchema == null) {
+ defaultAvroSchema = fetchSchema("/latest_with_type=" + avroSchemaName);
+ globalSchemaCache.put(hashKey, defaultAvroSchema);
+ LOGGER.info("Populated schema cache with schema for {}", hashKey);
+ }
+ }
+ this.avroRecordConvetrer = new AvroRecordToPinotRowGenerator(indexingSchema);
+ this.decoderFactory = new DecoderFactory();
+ md5ToAvroSchemaMap = new MD5AvroSchemaMap();
+ }
+
+ @Override
+ public GenericRow decode(byte[] payload, GenericRow destination) {
+ return decode(payload, 0, payload.length, destination);
+ }
+
+ @Override
+ public GenericRow decode(byte[] payload, int offset, int length, GenericRow destination) {
+ if (payload == null || payload.length == 0 || length == 0) {
+ return null;
+ }
+
+ System.arraycopy(payload, SCHEMA_HASH_START_OFFSET + offset, reusableMD5Bytes, 0, SCHEMA_HASH_LENGTH);
+
+ boolean schemaUpdateFailed = false;
+ org.apache.avro.Schema schema = md5ToAvroSchemaMap.getSchema(reusableMD5Bytes);
+ if (schema == null) {
+ // We will get here for the first row consumed in the segment, and every row that has a schema ID that is
+ // not yet in md5ToAvroSchemaMap.
+ synchronized (globalSchemaCache) {
+ final String hashKey = hex(reusableMD5Bytes);
+ schema = globalSchemaCache.get(hashKey);
+ if (schema == null) {
+ // We will get here only if no partition of the table has populated the global schema cache.
+ // In that case, one of the consumers will fetch the schema and populate the cache, and the others
+ // should find it in the cache and po
+ final String schemaUri = "/id=" + hex(reusableMD5Bytes);
+ try {
+ schema = fetchSchema(schemaUri);
+ globalSchemaCache.put(hashKey, schema);
+ md5ToAvroSchemaMap.addSchema(reusableMD5Bytes, schema);
+ } catch (Exception e) {
+ schema = defaultAvroSchema;
+ LOGGER
+ .error("Error fetching schema using url {}. Attempting to continue with previous schema", schemaUri, e);
+ schemaUpdateFailed = true;
+ }
+ } else {
+ LOGGER.info("Found schema for {} in cache", hashKey);
+ md5ToAvroSchemaMap.addSchema(reusableMD5Bytes, schema);
+ }
+ }
+ }
+ DatumReader<Record> reader = new GenericDatumReader<Record>(schema);
+ try {
+ Record avroRecord = reader.read(null,
+ decoderFactory.createBinaryDecoder(payload, HEADER_LENGTH + offset, length - HEADER_LENGTH, null));
+ return avroRecordConvetrer.transform(avroRecord, destination);
+ } catch (IOException e) {
+ LOGGER.error("Caught exception while reading message using schema {}{}",
+ (schema == null ? "null" : schema.getName()),
+ (schemaUpdateFailed ? "(possibly due to schema update failure)" : ""), e);
+ return null;
+ }
+ }
+
+ private String hex(byte[] bytes) {
+ StringBuilder builder = new StringBuilder(2 * bytes.length);
+ for (byte aByte : bytes) {
+ String hexString = Integer.toHexString(0xFF & aByte);
+ if (hexString.length() < 2) {
+ hexString = "0" + hexString;
+ }
+ builder.append(hexString);
+ }
+ return builder.toString();
+ }
+
+ private static class SchemaFetcher implements Callable<Boolean> {
+ private org.apache.avro.Schema _schema;
+ private URL url;
+ private boolean _isSuccessful = false;
+
+ SchemaFetcher(URL url) {
+ this.url = url;
+ }
+
+ @Override
+ public Boolean call()
+ throws Exception {
+ try {
+ URLConnection conn = url.openConnection();
+ conn.setConnectTimeout(15000);
+ conn.setReadTimeout(15000);
+ LOGGER.info("Fetching schema using url {}", url.toString());
+
+ StringBuilder queryResp = new StringBuilder();
+ try (BufferedReader reader = new BufferedReader(
+ new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8))) {
+ for (String line = reader.readLine(); line != null; line = reader.readLine()) {
+ queryResp.append(line);
+ }
+ }
+
+ _schema = org.apache.avro.Schema.parse(queryResp.toString());
+
+ LOGGER.info("Schema fetch succeeded on url {}", url.toString());
+ return Boolean.TRUE;
+ } catch (Exception e) {
+ LOGGER.warn("Caught exception while fetching schema", e);
+ return Boolean.FALSE;
+ }
+ }
+
+ public org.apache.avro.Schema getSchema() {
+ return _schema;
+ }
+ }
+
+ private org.apache.avro.Schema fetchSchema(String reference)
+ throws Exception {
+ SchemaFetcher schemaFetcher = new SchemaFetcher(makeRandomUrl(reference));
+ RetryPolicies
+ .exponentialBackoffRetryPolicy(MAXIMUM_SCHEMA_FETCH_RETRY_COUNT, MINIMUM_SCHEMA_FETCH_RETRY_TIME_MILLIS,
+ SCHEMA_FETCH_RETRY_EXPONENTIAL_BACKOFF_FACTOR).attempt(schemaFetcher);
+ return schemaFetcher.getSchema();
+ }
+
+ /**
+ * Private class for encapsulating MD5 to Avro schema mapping.
+ * <ul>
+ * <li> Maintains two lists, one for md5s and another for schema. </li>
+ * <li> MD5 at index i in the MD5 list, corresponds to Schema at index i in the schema list. </li>
+ * </ul>
+ */
+ private static class MD5AvroSchemaMap {
+ private List<byte[]> md5s;
+ private List<org.apache.avro.Schema> schemas;
+
+ /**
+ * Constructor for the class.
+ */
+ private MD5AvroSchemaMap() {
+ md5s = new ArrayList<>();
+ schemas = new ArrayList<>();
+ }
+
+ /**
+ * Returns the Avro schema corresponding to the given MD5.
+ *
+ * @param md5ForSchema MD5 for which to get the avro schema.
+ * @return Avro schema for the given MD5.
+ */
+ private org.apache.avro.Schema getSchema(byte[] md5ForSchema) {
+ for (int i = 0; i < md5s.size(); i++) {
+ if (Arrays.equals(md5s.get(i), md5ForSchema)) {
+ return schemas.get(i);
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Adds mapping between MD5 and Avro schema.
+ * Caller to ensure that addSchema is called only once per MD5-Schema pair.
+ *
+ * @param md5 MD5 for the Schema
+ * @param schema Avro Schema
+ */
+ private void addSchema(byte[] md5, org.apache.avro.Schema schema) {
+ md5s.add(Arrays.copyOf(md5, md5.length));
+ schemas.add(schema);
+ }
+ }
+
+ protected URL makeRandomUrl(String reference)
+ throws MalformedURLException {
+ Random rand = new Random();
+ int randomInteger = rand.nextInt(schemaRegistryUrls.length);
+ return new URL(schemaRegistryUrls[randomInteger] + reference);
+ }
+
+ protected String[] parseSchemaRegistryUrls(String schemaConfig) {
+ return schemaConfig.split(",");
+ }
+}
diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaConnectionHandler.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaConnectionHandler.java
deleted file mode 100644
index 802062f..0000000
--- a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaConnectionHandler.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.realtime.impl.kafka2;
-
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.serialization.BytesDeserializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.pinot.core.realtime.stream.StreamConfig;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Properties;
-
-public abstract class KafkaConnectionHandler {
-
- protected final KafkaPartitionLevelStreamConfig _config;
- protected final int _partition;
- protected final String _topic;
- protected final Consumer<String, byte[]> _consumer;
- protected final TopicPartition _topicPartition;
-
- public KafkaConnectionHandler(StreamConfig streamConfig, int partition) {
- _config = new KafkaPartitionLevelStreamConfig(streamConfig);
- _partition = partition;
- _topic = _config.getKafkaTopicName();
- Properties consumerProp = new Properties();
- consumerProp.putAll(streamConfig.getStreamConfigsMap());
- consumerProp.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, _config.getBootstrapHosts());
- consumerProp.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
- consumerProp.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class.getName());
- _consumer = new KafkaConsumer<>(consumerProp);
- _topicPartition = new TopicPartition(_topic, _partition);
- _consumer.assign(Collections.singletonList(_topicPartition));
-
- }
-
- public void close() throws IOException {
- _consumer.close();
- }
-
-
-}
diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaJSONMessageDecoder.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaJSONMessageDecoder.java
new file mode 100644
index 0000000..8d1fd96
--- /dev/null
+++ b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaJSONMessageDecoder.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.kafka2;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.util.Arrays;
+import java.util.Map;
+import org.apache.pinot.common.data.FieldSpec;
+import org.apache.pinot.common.data.Schema;
+import org.apache.pinot.common.utils.JsonUtils;
+import org.apache.pinot.core.data.GenericRow;
+import org.apache.pinot.core.realtime.stream.StreamMessageDecoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class KafkaJSONMessageDecoder implements StreamMessageDecoder<byte[]> {
+ private static final Logger LOGGER = LoggerFactory.getLogger(KafkaJSONMessageDecoder.class);
+
+ private Schema schema;
+
+ @Override
+ public void init(Map<String, String> props, Schema indexingSchema, String topicName)
+ throws Exception {
+ this.schema = indexingSchema;
+ }
+
+ @Override
+ public GenericRow decode(byte[] payload, GenericRow destination) {
+ try {
+ JsonNode message = JsonUtils.bytesToJsonNode(payload);
+ for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
+ String column = fieldSpec.getName();
+ destination.putField(column, JsonUtils.extractValue(message.get(column), fieldSpec));
+ }
+ return destination;
+ } catch (Exception e) {
+ LOGGER.error("Caught exception while decoding row, discarding row.", e);
+ return null;
+ }
+ }
+
+ @Override
+ public GenericRow decode(byte[] payload, int offset, int length, GenericRow destination) {
+ return decode(Arrays.copyOfRange(payload, offset, offset + length), destination);
+ }
+}
diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaMessageBatch.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaMessageBatch.java
deleted file mode 100644
index 22aa683..0000000
--- a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaMessageBatch.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.realtime.impl.kafka2;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.pinot.core.realtime.stream.MessageBatch;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class KafkaMessageBatch implements MessageBatch<byte[]> {
-
- private List<MessageAndOffset> messageList = new ArrayList<>();
-
- public KafkaMessageBatch(Iterable<ConsumerRecord<String, byte[]>> iterable) {
- for (ConsumerRecord<String, byte[]> record : iterable) {
- messageList.add(new MessageAndOffset(record.value(), record.offset()));
- }
- }
-
- @Override
- public int getMessageCount() {
- return messageList.size();
- }
-
- @Override
- public byte[] getMessageAtIndex(int index) {
- return messageList.get(index).getMessage().array();
- }
-
- @Override
- public int getMessageOffsetAtIndex(int index) {
- return messageList.get(index).getMessage().arrayOffset();
- }
-
- @Override
- public int getMessageLengthAtIndex(int index) {
- return messageList.get(index).getMessage().array().length;
- }
-
- @Override
- public long getNextStreamMessageOffsetAtIndex(int index) {
- return messageList.get(index).getNextOffset();
- }
-
- public Iterable<MessageAndOffset> iterable() {
- return messageList;
- }
-}
diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionConsumer.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionConsumer.java
deleted file mode 100644
index de3295d..0000000
--- a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionConsumer.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.realtime.impl.kafka2;
-
-import org.apache.kafka.clients.consumer.*;
-
-import org.apache.pinot.core.realtime.stream.MessageBatch;
-import org.apache.pinot.core.realtime.stream.PartitionLevelConsumer;
-import org.apache.pinot.core.realtime.stream.StreamConfig;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.TimeoutException;
-
-public class KafkaPartitionConsumer extends KafkaConnectionHandler implements PartitionLevelConsumer {
-
-
- public KafkaPartitionConsumer(StreamConfig streamConfig, int partition) {
- super(streamConfig, partition);
- }
-
- @Override
- public MessageBatch fetchMessages(long startOffset, long endOffset, int timeoutMillis) throws TimeoutException {
- _consumer.seek(_topicPartition, startOffset);
-
- ConsumerRecords<String, byte[]> consumerRecords = _consumer.poll(null);
- List<ConsumerRecord<String, byte[]>> records = consumerRecords.records(_topicPartition);
- return new KafkaMessageBatch(records);
- }
-
- @Override
- public void close() throws IOException {
- super.close();
- }
-}
diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionLevelStreamConfig.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionLevelStreamConfig.java
deleted file mode 100644
index c154a38..0000000
--- a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionLevelStreamConfig.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.realtime.impl.kafka2;
-
-import com.google.common.base.Preconditions;
-import org.apache.commons.lang.StringUtils;
-import org.apache.pinot.common.utils.EqualityUtils;
-import org.apache.pinot.core.realtime.stream.StreamConfig;
-
-import java.util.Map;
-import java.util.Properties;
-
-public class KafkaPartitionLevelStreamConfig {
-
- private final String _kafkaTopicName;
- private final String _bootstrapHosts;
- private final int _kafkaBufferSize;
- private final int _kafkaSocketTimeout;
- private final int _kafkaFetcherSizeBytes;
- private final int _kafkaFetcherMinBytes;
- private final Map<String, String> _streamConfigMap;
-
- /**
- * Builds a wrapper around {@link StreamConfig} to fetch kafka partition level consumer related configs
- * @param streamConfig
- */
- public KafkaPartitionLevelStreamConfig(StreamConfig streamConfig) {
- _streamConfigMap = streamConfig.getStreamConfigsMap();
-
- _kafkaTopicName = streamConfig.getTopicName();
-
- String llcBrokerListKey = KafkaStreamConfigProperties
- .constructStreamProperty(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_BROKER_LIST);
- String llcBufferKey = KafkaStreamConfigProperties
- .constructStreamProperty(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE);
- String llcTimeoutKey = KafkaStreamConfigProperties
- .constructStreamProperty(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_SOCKET_TIMEOUT);
- String fetcherSizeKey = KafkaStreamConfigProperties
- .constructStreamProperty(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_FETCHER_SIZE_BYTES);
- String fetcherMinBytesKey = KafkaStreamConfigProperties
- .constructStreamProperty(KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_FETCHER_MIN_BYTES);
- _bootstrapHosts = _streamConfigMap.get(llcBrokerListKey);
- _kafkaBufferSize = getIntConfigWithDefault(_streamConfigMap, llcBufferKey,
- KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE_DEFAULT);
- _kafkaSocketTimeout = getIntConfigWithDefault(_streamConfigMap, llcTimeoutKey,
- KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_SOCKET_TIMEOUT_DEFAULT);
- _kafkaFetcherSizeBytes = getIntConfigWithDefault(_streamConfigMap, fetcherSizeKey, _kafkaBufferSize);
- _kafkaFetcherMinBytes = getIntConfigWithDefault(_streamConfigMap, fetcherMinBytesKey,
- KafkaStreamConfigProperties.LowLevelConsumer.KAFKA_FETCHER_MIN_BYTES_DEFAULT);
- Preconditions.checkNotNull(_bootstrapHosts,
- "Must specify kafka brokers list " + llcBrokerListKey + " in case of low level kafka consumer");
- }
-
- public String getKafkaTopicName() {
- return _kafkaTopicName;
- }
-
- public String getBootstrapHosts() {
- return _bootstrapHosts;
- }
-
- public int getKafkaBufferSize() {
- return _kafkaBufferSize;
- }
-
- public int getKafkaSocketTimeout() {
- return _kafkaSocketTimeout;
- }
-
- public int getKafkaFetcherSizeBytes() {
- return _kafkaFetcherSizeBytes;
- }
-
- public int getKafkaFetcherMinBytes() {
- return _kafkaFetcherMinBytes;
- }
-
- private int getIntConfigWithDefault(Map<String, String> configMap, String key, int defaultValue) {
- String stringValue = configMap.get(key);
- try {
- if (StringUtils.isNotEmpty(stringValue)) {
- return Integer.parseInt(stringValue);
- }
- return defaultValue;
- } catch (NumberFormatException ex) {
- return defaultValue;
- }
- }
-
- @Override
- public String toString() {
- return "KafkaLowLevelStreamConfig{" + "_kafkaTopicName='" + _kafkaTopicName + '\'' + ", _bootstrapHosts='"
- + _bootstrapHosts + '\'' + ", _kafkaBufferSize='" + _kafkaBufferSize + '\'' + ", _kafkaSocketTimeout='"
- + _kafkaSocketTimeout + '\'' + ", _kafkaFetcherSizeBytes='" + _kafkaFetcherSizeBytes + '\'' + ", _kafkaFetcherMinBytes='"
- + _kafkaFetcherMinBytes + '\'' + '}';
- }
-
- @Override
- public boolean equals(Object o) {
- if (EqualityUtils.isSameReference(this, o)) {
- return true;
- }
-
- if (EqualityUtils.isNullOrNotSameClass(this, o)) {
- return false;
- }
-
- KafkaPartitionLevelStreamConfig that = (KafkaPartitionLevelStreamConfig) o;
-
- return EqualityUtils.isEqual(_kafkaTopicName, that._kafkaTopicName) && EqualityUtils
- .isEqual(_bootstrapHosts, that._bootstrapHosts) && EqualityUtils
- .isEqual(_kafkaBufferSize, that._kafkaBufferSize) && EqualityUtils
- .isEqual(_kafkaSocketTimeout, that._kafkaSocketTimeout) && EqualityUtils
- .isEqual(_kafkaFetcherSizeBytes, that._kafkaFetcherSizeBytes) && EqualityUtils
- .isEqual(_kafkaFetcherMinBytes, that._kafkaFetcherMinBytes);
- }
-
- @Override
- public int hashCode() {
- int result = EqualityUtils.hashCodeOf(_kafkaTopicName);
- result = EqualityUtils.hashCodeOf(result, _bootstrapHosts);
- result = EqualityUtils.hashCodeOf(result, _kafkaBufferSize);
- result = EqualityUtils.hashCodeOf(result, _kafkaSocketTimeout);
- result = EqualityUtils.hashCodeOf(result, _kafkaFetcherSizeBytes);
- result = EqualityUtils.hashCodeOf(result, _kafkaFetcherMinBytes);
- return result;
- }
-}
diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaStreamMetadataProvider.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaStreamMetadataProvider.java
deleted file mode 100644
index 3871d85..0000000
--- a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaStreamMetadataProvider.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.realtime.impl.kafka2;
-
-import com.google.common.base.Preconditions;
-import org.apache.kafka.clients.admin.AdminClient;
-import org.apache.kafka.clients.admin.DescribeTopicsResult;
-import org.apache.kafka.clients.admin.TopicDescription;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.KafkaFuture;
-import org.apache.pinot.core.realtime.stream.OffsetCriteria;
-import org.apache.pinot.core.realtime.stream.StreamConfig;
-import org.apache.pinot.core.realtime.stream.StreamMetadataProvider;
-
-import javax.annotation.Nonnull;
-import java.io.IOException;
-import java.time.Duration;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeoutException;
-
-public class KafkaStreamMetadataProvider extends KafkaConnectionHandler implements StreamMetadataProvider {
-
- private AdminClient _adminClient;
-
- public KafkaStreamMetadataProvider(StreamConfig streamConfig, int partition) {
- super(streamConfig, partition);
- final Properties props = new Properties();
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, _config.getBootstrapHosts());
- _adminClient = AdminClient.create(props);
- }
-
- @Override
- public int fetchPartitionCount(long timeoutMillis) {
- DescribeTopicsResult result = _adminClient.describeTopics(Collections.singletonList(_config.getKafkaTopicName()));
- Map<String, KafkaFuture<TopicDescription>> values = result.values();
- KafkaFuture<TopicDescription> topicDescription = values.get(_config.getKafkaTopicName());
- try {
- return topicDescription.get().partitions().size();
- } catch (InterruptedException | ExecutionException e) {
- throw new RuntimeException("");
- }
- }
-
- @Override
- public long fetchPartitionOffset(@Nonnull OffsetCriteria offsetCriteria, long timeoutMillis) throws TimeoutException {
-
- Preconditions.checkNotNull(offsetCriteria);
- if (offsetCriteria.isLargest()) {
- return _consumer.endOffsets(Collections.singletonList(_topicPartition), Duration.ofMillis(timeoutMillis)).get(_topicPartition);
- } else if (offsetCriteria.isSmallest()) {
- return _consumer.beginningOffsets(Collections.singletonList(_topicPartition), Duration.ofMillis(timeoutMillis)).get(_topicPartition);
- } else {
- throw new IllegalArgumentException("Unknown initial offset value " + offsetCriteria.toString());
- }
-
- }
-
- @Override
- public void close() throws IOException {
- super.close();
- }
-}
diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/MessageAndOffset.java b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/MessageAndOffset.java
index 0dea267..b5bdaba 100644
--- a/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/MessageAndOffset.java
+++ b/pinot-connectors/pinot-connector-kafka-2.0/src/main/java/org/apache/pinot/core/realtime/impl/kafka2/MessageAndOffset.java
@@ -20,30 +20,34 @@ package org.apache.pinot.core.realtime.impl.kafka2;
import java.nio.ByteBuffer;
+
public class MessageAndOffset {
- private ByteBuffer _message;
- private long _offset;
+ private ByteBuffer _message;
+ private long _offset;
+
+ public MessageAndOffset(byte[] message, long offset) {
+ this(ByteBuffer.wrap(message), offset);
+ }
- public MessageAndOffset(byte[] message, long offset) {
- _message = ByteBuffer.wrap(message);
- _offset = offset;
- }
+ public MessageAndOffset(ByteBuffer message, long offset) {
+ _message = message;
+ _offset = offset;
+ }
- public MessageAndOffset(ByteBuffer message, long offset) {
- _message = message;
- _offset = offset;
- }
+ public ByteBuffer getMessage() {
+ return _message;
+ }
- public ByteBuffer getMessage() {
- return _message;
- }
+ public long getOffset() {
+ return _offset;
+ }
- public long getOffset() {
- return _offset;
- }
+ public long getNextOffset() {
+ return getOffset() + 1;
+ }
- public long getNextOffset() {
- return _offset + 1;
- }
+ public int payloadSize() {
+ return getMessage().array().length;
+ }
}
diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/test/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionLevelConsumerTest.java b/pinot-connectors/pinot-connector-kafka-2.0/src/test/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionLevelConsumerTest.java
new file mode 100644
index 0000000..cc28127
--- /dev/null
+++ b/pinot-connectors/pinot-connector-kafka-2.0/src/test/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionLevelConsumerTest.java
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.kafka2;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.pinot.core.realtime.impl.kafka2.utils.MiniKafkaCluster;
+import org.apache.pinot.core.realtime.stream.OffsetCriteria;
+import org.apache.pinot.core.realtime.stream.StreamConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+/**
+ * Tests for the KafkaPartitionLevelConsumer.
+ */
+public class KafkaPartitionLevelConsumerTest {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPartitionLevelConsumerTest.class);
+ private static final long STABILIZE_SLEEP_DELAYS = 3000;
+ private static final String TEST_TOPIC_1 = "foo";
+ private static final String TEST_TOPIC_2 = "bar";
+ private static final int NUM_MSG_PRODUCED = 1000;
+
+ private static MiniKafkaCluster kafkaCluster;
+ private static String brokerAddress;
+
+ @BeforeClass
+ public static void setup()
+ throws Exception {
+ kafkaCluster = new MiniKafkaCluster.Builder().newServer("0").build();
+ LOGGER.info("Trying to start MiniKafkaCluster");
+ kafkaCluster.start();
+ brokerAddress = getKakfaBroker();
+ kafkaCluster.createTopic(TEST_TOPIC_1, 1, 1);
+ kafkaCluster.createTopic(TEST_TOPIC_2, 2, 1);
+ Thread.sleep(STABILIZE_SLEEP_DELAYS);
+ produceMsgToKafka();
+ Thread.sleep(STABILIZE_SLEEP_DELAYS);
+ }
+
+ private static void produceMsgToKafka() {
+ Properties props = new Properties();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getKakfaBroker());
+ props.put(ProducerConfig.CLIENT_ID_CONFIG, "clientId");
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ KafkaProducer p = new KafkaProducer<>(props);
+ for (int i = 0; i < NUM_MSG_PRODUCED; i++) {
+ p.send(new ProducerRecord(TEST_TOPIC_1, "sample_msg_" + i));
+ p.send(new ProducerRecord(TEST_TOPIC_2, "sample_msg_" + i));
+ }
+ }
+
+ private static String getKakfaBroker() {
+ return "127.0.0.1:" + kafkaCluster.getKafkaServerPort(0);
+ }
+
+ @AfterClass
+ public static void shutDown()
+ throws Exception {
+ kafkaCluster.deleteTopic(TEST_TOPIC_1);
+ kafkaCluster.deleteTopic(TEST_TOPIC_2);
+ kafkaCluster.close();
+ }
+
+ @Test
+ public void testBuildConsumer()
+ throws Exception {
+ String streamType = "kafka";
+ String streamKafkaTopicName = "theTopic";
+ String streamKafkaBrokerList = "127.0.0.1:" + kafkaCluster.getKafkaServerPort(0);
+ String streamKafkaConsumerType = "simple";
+ String clientId = "clientId";
+
+ Map<String, String> streamConfigMap = new HashMap<>();
+ streamConfigMap.put("streamType", streamType);
+ streamConfigMap.put("stream.kafka.topic.name", streamKafkaTopicName);
+ streamConfigMap.put("stream.kafka.broker.list", streamKafkaBrokerList);
+ streamConfigMap.put("stream.kafka.consumer.type", streamKafkaConsumerType);
+ streamConfigMap.put("stream.kafka.consumer.factory.class.name", Kafka2ConsumerFactory.class.getName());
+ streamConfigMap.put("stream.kafka.decoder.class.name", "decoderClass");
+ streamConfigMap.put("stream.kafka.fetcher.size", "10000");
+ streamConfigMap.put("stream.kafka.fetcher.minBytes", "20000");
+ StreamConfig streamConfig = new StreamConfig(streamConfigMap);
+
+ Kafka2PartitionLevelStreamMetadataProvider streamMetadataProvider =
+ new Kafka2PartitionLevelStreamMetadataProvider(clientId, streamConfig);
+
+ // test default value
+ Kafka2PartitionLevelPartitionLevelConsumer kafkaSimpleStreamConsumer =
+ new Kafka2PartitionLevelPartitionLevelConsumer(clientId, streamConfig, 0);
+ kafkaSimpleStreamConsumer.fetchMessages(12345L, 23456L, 10000);
+
+ Assert.assertEquals(Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE_DEFAULT,
+ kafkaSimpleStreamConsumer.getKafka2PartitionLevelStreamConfig().getKafkaBufferSize());
+ Assert.assertEquals(Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_SOCKET_TIMEOUT_DEFAULT,
+ kafkaSimpleStreamConsumer.getKafka2PartitionLevelStreamConfig().getKafkaSocketTimeout());
+
+ // test parsing values
+ Assert.assertEquals(10000,
+ kafkaSimpleStreamConsumer.getKafka2PartitionLevelStreamConfig().getKafkaFetcherSizeBytes());
+ Assert
+ .assertEquals(20000, kafkaSimpleStreamConsumer.getKafka2PartitionLevelStreamConfig().getKafkaFetcherMinBytes());
+
+ // test user defined values
+ streamConfigMap.put("stream.kafka.buffer.size", "100");
+ streamConfigMap.put("stream.kafka.socket.timeout", "1000");
+ streamConfig = new StreamConfig(streamConfigMap);
+ kafkaSimpleStreamConsumer = new Kafka2PartitionLevelPartitionLevelConsumer(clientId, streamConfig, 0);
+ kafkaSimpleStreamConsumer.fetchMessages(12345L, 23456L, 10000);
+ Assert.assertEquals(100, kafkaSimpleStreamConsumer.getKafka2PartitionLevelStreamConfig().getKafkaBufferSize());
+ Assert.assertEquals(1000, kafkaSimpleStreamConsumer.getKafka2PartitionLevelStreamConfig().getKafkaSocketTimeout());
+ }
+
+ @Test
+ public void testGetPartitionCount() {
+ String streamType = "kafka";
+ String streamKafkaBrokerList = "127.0.0.1:" + kafkaCluster.getKafkaServerPort(0);
+ String streamKafkaConsumerType = "simple";
+ String clientId = "clientId";
+
+ Map<String, String> streamConfigMap = new HashMap<>();
+ streamConfigMap.put("streamType", streamType);
+ streamConfigMap.put("stream.kafka.topic.name", TEST_TOPIC_1);
+ streamConfigMap.put("stream.kafka.broker.list", streamKafkaBrokerList);
+ streamConfigMap.put("stream.kafka.consumer.type", streamKafkaConsumerType);
+ streamConfigMap.put("stream.kafka.consumer.factory.class.name", Kafka2ConsumerFactory.class.getName());
+ streamConfigMap.put("stream.kafka.decoder.class.name", "decoderClass");
+ StreamConfig streamConfig = new StreamConfig(streamConfigMap);
+
+ Kafka2PartitionLevelStreamMetadataProvider streamMetadataProvider =
+ new Kafka2PartitionLevelStreamMetadataProvider(clientId, streamConfig);
+ Assert.assertEquals(streamMetadataProvider.fetchPartitionCount(10000L), 1);
+
+ streamConfigMap = new HashMap<>();
+ streamConfigMap.put("streamType", streamType);
+ streamConfigMap.put("stream.kafka.topic.name", TEST_TOPIC_2);
+ streamConfigMap.put("stream.kafka.broker.list", streamKafkaBrokerList);
+ streamConfigMap.put("stream.kafka.consumer.type", streamKafkaConsumerType);
+ streamConfigMap.put("stream.kafka.consumer.factory.class.name", Kafka2ConsumerFactory.class.getName());
+ streamConfigMap.put("stream.kafka.decoder.class.name", "decoderClass");
+ streamConfig = new StreamConfig(streamConfigMap);
+
+ streamMetadataProvider = new Kafka2PartitionLevelStreamMetadataProvider(clientId, streamConfig);
+ Assert.assertEquals(streamMetadataProvider.fetchPartitionCount(10000L), 2);
+ }
+
+ @Test
+ public void testFetchMessages()
+ throws Exception {
+ String streamType = "kafka";
+ String streamKafkaTopicName = "theTopic";
+ String streamKafkaBrokerList = "127.0.0.1:" + kafkaCluster.getKafkaServerPort(0);
+ String streamKafkaConsumerType = "simple";
+ String clientId = "clientId";
+
+ Map<String, String> streamConfigMap = new HashMap<>();
+ streamConfigMap.put("streamType", streamType);
+ streamConfigMap.put("stream.kafka.topic.name", streamKafkaTopicName);
+ streamConfigMap.put("stream.kafka.broker.list", streamKafkaBrokerList);
+ streamConfigMap.put("stream.kafka.consumer.type", streamKafkaConsumerType);
+ streamConfigMap.put("stream.kafka.consumer.factory.class.name", Kafka2ConsumerFactory.class.getName());
+ streamConfigMap.put("stream.kafka.decoder.class.name", "decoderClass");
+ StreamConfig streamConfig = new StreamConfig(streamConfigMap);
+
+ int partition = 0;
+ Kafka2PartitionLevelPartitionLevelConsumer kafkaSimpleStreamConsumer =
+ new Kafka2PartitionLevelPartitionLevelConsumer(clientId, streamConfig, partition);
+ kafkaSimpleStreamConsumer.fetchMessages(12345L, 23456L, 10000);
+ }
+
+ @Test
+ public void testFetchOffsets()
+ throws Exception {
+ testFetchOffsets(TEST_TOPIC_1);
+ testFetchOffsets(TEST_TOPIC_2);
+ }
+
+ private void testFetchOffsets(String topic)
+ throws Exception {
+ String streamType = "kafka";
+ String streamKafkaBrokerList = "127.0.0.1:" + kafkaCluster.getKafkaServerPort(0);
+ String streamKafkaConsumerType = "simple";
+ String clientId = "clientId";
+
+ Map<String, String> streamConfigMap = new HashMap<>();
+ streamConfigMap.put("streamType", streamType);
+ streamConfigMap.put("stream.kafka.topic.name", topic);
+ streamConfigMap.put("stream.kafka.broker.list", streamKafkaBrokerList);
+ streamConfigMap.put("stream.kafka.consumer.type", streamKafkaConsumerType);
+ streamConfigMap.put("stream.kafka.consumer.factory.class.name", Kafka2ConsumerFactory.class.getName());
+ streamConfigMap.put("stream.kafka.decoder.class.name", "decoderClass");
+ StreamConfig streamConfig = new StreamConfig(streamConfigMap);
+
+ int numPartitions =
+ new Kafka2PartitionLevelStreamMetadataProvider(clientId, streamConfig).fetchPartitionCount(10000);
+ for (int partition = 0; partition < numPartitions; partition++) {
+ Kafka2PartitionLevelStreamMetadataProvider kafkaStreamMetadataProvider =
+ new Kafka2PartitionLevelStreamMetadataProvider(clientId, streamConfig, partition);
+ Assert.assertEquals(0, kafkaStreamMetadataProvider
+ .fetchPartitionOffset(new OffsetCriteria.OffsetCriteriaBuilder().withOffsetSmallest(), 10000));
+ Assert.assertEquals(NUM_MSG_PRODUCED / numPartitions, kafkaStreamMetadataProvider
+ .fetchPartitionOffset(new OffsetCriteria.OffsetCriteriaBuilder().withOffsetLargest(), 10000));
+ }
+ }
+}
diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/test/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionLevelStreamConfigTest.java b/pinot-connectors/pinot-connector-kafka-2.0/src/test/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionLevelStreamConfigTest.java
new file mode 100644
index 0000000..df02b9f
--- /dev/null
+++ b/pinot-connectors/pinot-connector-kafka-2.0/src/test/java/org/apache/pinot/core/realtime/impl/kafka2/KafkaPartitionLevelStreamConfigTest.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.kafka2;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.core.realtime.stream.StreamConfig;
+import org.apache.pinot.core.realtime.stream.StreamConfigProperties;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class KafkaPartitionLevelStreamConfigTest {
+
+ private Kafka2PartitionLevelStreamConfig getStreamConfig(String topic, String bootstrapHosts, String buffer,
+ String socketTimeout) {
+ return getStreamConfig(topic, bootstrapHosts, buffer, socketTimeout, null, null);
+ }
+
+ private Kafka2PartitionLevelStreamConfig getStreamConfig(String topic, String bootstrapHosts, String buffer,
+ String socketTimeout, String fetcherSize, String fetcherMinBytes) {
+ Map<String, String> streamConfigMap = new HashMap<>();
+ String streamType = "kafka";
+ String consumerType = StreamConfig.ConsumerType.LOWLEVEL.toString();
+ String consumerFactoryClassName = Kafka2ConsumerFactory.class.getName();
+ String decoderClass = KafkaAvroMessageDecoder.class.getName();
+ streamConfigMap.put(StreamConfigProperties.STREAM_TYPE, streamType);
+ streamConfigMap
+ .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_TOPIC_NAME),
+ topic);
+ streamConfigMap
+ .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
+ consumerType);
+ streamConfigMap.put(StreamConfigProperties
+ .constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS),
+ consumerFactoryClassName);
+ streamConfigMap
+ .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_DECODER_CLASS),
+ decoderClass);
+ streamConfigMap.put("stream.kafka.broker.list", bootstrapHosts);
+ if (buffer != null) {
+ streamConfigMap.put("stream.kafka.buffer.size", buffer);
+ }
+ if (socketTimeout != null) {
+ streamConfigMap.put("stream.kafka.socket.timeout", socketTimeout);
+ }
+ if (fetcherSize != null) {
+ streamConfigMap.put("stream.kafka.fetcher.size", fetcherSize);
+ }
+ if (fetcherMinBytes != null) {
+ streamConfigMap.put("stream.kafka.fetcher.minBytes", fetcherMinBytes);
+ }
+ return new Kafka2PartitionLevelStreamConfig(new StreamConfig(streamConfigMap));
+ }
+
+ @Test
+ public void testGetKafkaTopicName() {
+ Kafka2PartitionLevelStreamConfig config = getStreamConfig("topic", "", "", "");
+ Assert.assertEquals("topic", config.getKafkaTopicName());
+ }
+
+ @Test
+ public void testGetBootstrapHosts() {
+ Kafka2PartitionLevelStreamConfig config = getStreamConfig("topic", "host1", "", "");
+ Assert.assertEquals("host1", config.getBootstrapHosts());
+ }
+
+ @Test
+ public void testGetKafkaBufferSize() {
+ // test default
+ Kafka2PartitionLevelStreamConfig config = getStreamConfig("topic", "host1", null, "");
+ Assert.assertEquals(Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE_DEFAULT,
+ config.getKafkaBufferSize());
+
+ config = getStreamConfig("topic", "host1", "", "");
+ Assert.assertEquals(Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE_DEFAULT,
+ config.getKafkaBufferSize());
+
+ config = getStreamConfig("topic", "host1", "bad value", "");
+ Assert.assertEquals(Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE_DEFAULT,
+ config.getKafkaBufferSize());
+
+ // correct config
+ config = getStreamConfig("topic", "host1", "100", "");
+ Assert.assertEquals(100, config.getKafkaBufferSize());
+ }
+
+ @Test
+ public void testGetKafkaSocketTimeout() {
+ // test default
+ Kafka2PartitionLevelStreamConfig config = getStreamConfig("topic", "host1", "", null);
+ Assert.assertEquals(Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_SOCKET_TIMEOUT_DEFAULT,
+ config.getKafkaSocketTimeout());
+
+ config = getStreamConfig("topic", "host1", "", "");
+ Assert.assertEquals(Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_SOCKET_TIMEOUT_DEFAULT,
+ config.getKafkaSocketTimeout());
+
+ config = getStreamConfig("topic", "host1", "", "bad value");
+ Assert.assertEquals(Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_SOCKET_TIMEOUT_DEFAULT,
+ config.getKafkaSocketTimeout());
+
+ // correct config
+ config = getStreamConfig("topic", "host1", "", "100");
+ Assert.assertEquals(100, config.getKafkaSocketTimeout());
+ }
+
+ @Test
+ public void testGetFetcherSize() {
+ // test default
+ Kafka2PartitionLevelStreamConfig config = getStreamConfig("topic", "host1", "", "", "", null);
+ Assert.assertEquals(Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_BUFFER_SIZE_DEFAULT,
+ config.getKafkaFetcherSizeBytes());
+
+ config = getStreamConfig("topic", "host1", "100", "", "", null);
+ Assert.assertEquals(100, config.getKafkaFetcherSizeBytes());
+
+ config = getStreamConfig("topic", "host1", "100", "", "bad value", null);
+ Assert.assertEquals(100, config.getKafkaFetcherSizeBytes());
+
+ // correct config
+ config = getStreamConfig("topic", "host1", "100", "", "200", null);
+ Assert.assertEquals(200, config.getKafkaFetcherSizeBytes());
+ }
+
+ @Test
+ public void testGetFetcherMinBytes() {
+ // test default
+ Kafka2PartitionLevelStreamConfig config = getStreamConfig("topic", "host1", "", "", "", null);
+ Assert.assertEquals(Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_FETCHER_MIN_BYTES_DEFAULT,
+ config.getKafkaFetcherMinBytes());
+
+ config = getStreamConfig("topic", "host1", "", "", "", "");
+ Assert.assertEquals(Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_FETCHER_MIN_BYTES_DEFAULT,
+ config.getKafkaFetcherMinBytes());
+
+ config = getStreamConfig("topic", "host1", "", "", "", "bad value");
+ Assert.assertEquals(Kafka2StreamConfigProperties.LowLevelConsumer.KAFKA_FETCHER_MIN_BYTES_DEFAULT,
+ config.getKafkaFetcherMinBytes());
+
+ // correct config
+ config = getStreamConfig("topic", "host1", "", "", "", "100");
+ Assert.assertEquals(100, config.getKafkaFetcherMinBytes());
+ }
+}
diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/test/java/org/apache/pinot/core/realtime/impl/kafka2/utils/EmbeddedZooKeeper.java b/pinot-connectors/pinot-connector-kafka-2.0/src/test/java/org/apache/pinot/core/realtime/impl/kafka2/utils/EmbeddedZooKeeper.java
new file mode 100644
index 0000000..47370aa
--- /dev/null
+++ b/pinot-connectors/pinot-connector-kafka-2.0/src/test/java/org/apache/pinot/core/realtime/impl/kafka2/utils/EmbeddedZooKeeper.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.kafka2.utils;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.file.Files;
+import org.apache.commons.io.FileUtils;
+import org.apache.zookeeper.server.NIOServerCnxnFactory;
+import org.apache.zookeeper.server.ZooKeeperServer;
+
+
+public class EmbeddedZooKeeper implements Closeable {
+
+ private static final int TICK_TIME = 500;
+ private final NIOServerCnxnFactory factory;
+ private final ZooKeeperServer zookeeper;
+ private final File tmpDir;
+ private final int port;
+
+ EmbeddedZooKeeper() throws IOException, InterruptedException {
+ this.tmpDir = Files.createTempDirectory(null).toFile();
+ this.factory = new NIOServerCnxnFactory();
+ this.zookeeper = new ZooKeeperServer(new File(tmpDir, "data"), new File(tmpDir, "log"),
+ TICK_TIME);
+ InetSocketAddress addr = new InetSocketAddress("127.0.0.1", 0);
+ factory.configure(addr, 0);
+ factory.startup(zookeeper);
+ this.port = zookeeper.getClientPort();
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ @Override
+ public void close() throws IOException {
+ zookeeper.shutdown();
+ factory.shutdown();
+ FileUtils.deleteDirectory(tmpDir);
+ }
+}
diff --git a/pinot-connectors/pinot-connector-kafka-2.0/src/test/java/org/apache/pinot/core/realtime/impl/kafka2/utils/MiniKafkaCluster.java b/pinot-connectors/pinot-connector-kafka-2.0/src/test/java/org/apache/pinot/core/realtime/impl/kafka2/utils/MiniKafkaCluster.java
new file mode 100644
index 0000000..3ec32fc
--- /dev/null
+++ b/pinot-connectors/pinot-connector-kafka-2.0/src/test/java/org/apache/pinot/core/realtime/impl/kafka2/utils/MiniKafkaCluster.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.kafka2.utils;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import org.apache.commons.io.FileUtils;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.DeleteTopicsResult;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.collection.Seq;
+
+
+public final class MiniKafkaCluster implements Closeable {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(MiniKafkaCluster.class);
+ private final EmbeddedZooKeeper zkServer;
+ private final ArrayList<KafkaServer> kafkaServer;
+ private final Path tempDir;
+ private final AdminClient adminClient;
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ private MiniKafkaCluster(List<String> brokerIds)
+ throws IOException, InterruptedException {
+ this.zkServer = new EmbeddedZooKeeper();
+ this.tempDir = Files.createTempDirectory(Paths.get(System.getProperty("java.io.tmpdir")), "mini-kafka-cluster");
+ this.kafkaServer = new ArrayList<>();
+ int port = 0;
+ for (String id : brokerIds) {
+ port = getAvailablePort();
+ KafkaConfig c = new KafkaConfig(createBrokerConfig(id, port));
+ Seq seq =
+ scala.collection.JavaConverters.collectionAsScalaIterableConverter(Collections.emptyList()).asScala().toSeq();
+ kafkaServer.add(new KafkaServer(c, Time.SYSTEM, Option.empty(), seq));
+ }
+ Properties props = new Properties();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:" + port);
+ adminClient = AdminClient.create(props);
+ }
+
+ static int getAvailablePort() {
+ try {
+ try (ServerSocket socket = new ServerSocket(0)) {
+ return socket.getLocalPort();
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to find available port to use", e);
+ }
+ }
+
+ private Properties createBrokerConfig(String nodeId, int port)
+ throws IOException {
+ Properties props = new Properties();
+ props.put("broker.id", nodeId);
+ props.put("port", Integer.toString(port));
+ props.put("log.dir", Files.createTempDirectory(tempDir, "broker-").toAbsolutePath().toString());
+ props.put("zookeeper.connect", "127.0.0.1:" + zkServer.getPort());
+ props.put("replica.socket.timeout.ms", "1500");
+ props.put("controller.socket.timeout.ms", "1500");
+ props.put("controlled.shutdown.enable", "true");
+ props.put("delete.topic.enable", "true");
+ props.put("auto.create.topics.enable", "true");
+ props.put("offsets.topic.replication.factor", "1");
+ props.put("controlled.shutdown.retry.backoff.ms", "100");
+ props.put("log.cleaner.dedupe.buffer.size", "2097152");
+ return props;
+ }
+
+ public void start() {
+ for (KafkaServer s : kafkaServer) {
+ s.startup();
+ }
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+ for (KafkaServer s : kafkaServer) {
+ s.shutdown();
+ }
+ this.zkServer.close();
+ FileUtils.deleteDirectory(tempDir.toFile());
+ }
+
+ public EmbeddedZooKeeper getZkServer() {
+ return zkServer;
+ }
+
+ public List<KafkaServer> getKafkaServer() {
+ return kafkaServer;
+ }
+
+ public int getKafkaServerPort(int index) {
+ return kafkaServer.get(index).socketServer()
+ .boundPort(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT));
+ }
+
+ public AdminClient getAdminClient() {
+ return adminClient;
+ }
+
+ public boolean createTopic(String topicName, int numPartitions, int replicationFactor) {
+ NewTopic newTopic = new NewTopic(topicName, numPartitions, (short) replicationFactor);
+ CreateTopicsResult createTopicsResult = this.adminClient.createTopics(Arrays.asList(newTopic));
+ try {
+ createTopicsResult.all().get();
+ } catch (InterruptedException | ExecutionException e) {
+ LOGGER.error("Failed to create Kafka topic: {}, Exception: {}", newTopic.toString(), e);
+ return false;
+ }
+ return true;
+ }
+
+ public boolean deleteTopic(String topicName) {
+ final DeleteTopicsResult deleteTopicsResult = this.adminClient.deleteTopics(Collections.singletonList(topicName));
+ try {
+ deleteTopicsResult.all().get();
+ } catch (InterruptedException | ExecutionException e) {
+ LOGGER.error("Failed to delete Kafka topic: {}, Exception: {}", topicName, e);
+ return false;
+ }
+ return true;
+ }
+
+ public static class Builder {
+
+ private List<String> brokerIds = new ArrayList<>();
+
+ public Builder newServer(String brokerId) {
+ brokerIds.add(brokerId);
+ return this;
+ }
+
+ public MiniKafkaCluster build()
+ throws IOException, InterruptedException {
+ return new MiniKafkaCluster(brokerIds);
+ }
+ }
+}
\ No newline at end of file
diff --git a/pinot-connectors/pom.xml b/pinot-connectors/pom.xml
index 64d798d..1b45cc1 100644
--- a/pinot-connectors/pom.xml
+++ b/pinot-connectors/pom.xml
@@ -47,11 +47,23 @@
<groupId>org.apache.pinot</groupId>
<artifactId>pinot-common</artifactId>
<scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.pinot</groupId>
<artifactId>pinot-core</artifactId>
<scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<!-- test -->
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org