You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2021/01/03 02:16:36 UTC
[incubator-pinot] branch sharded_consumer_type_support updated:
default methods to avoid interface changes
This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch sharded_consumer_type_support
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/sharded_consumer_type_support by this push:
new 6f4336e default methods to avoid interface changes
6f4336e is described below
commit 6f4336e070522a60e787fc5b948d6e00c7eda1d8
Author: Neha Pawar <ne...@gmail.com>
AuthorDate: Sat Jan 2 18:16:09 2021 -0800
default methods to avoid interface changes
---
.../realtime/LLRealtimeSegmentDataManager.java | 3 +-
.../impl/fakestream/FakeStreamConsumerFactory.java | 8 +---
.../fakestream/FakeStreamMetadataProvider.java | 11 -----
...lakyConsumerRealtimeClusterIntegrationTest.java | 5 ---
.../stream/kafka09/KafkaConsumerFactory.java | 7 ---
.../kafka09/KafkaStreamMetadataProvider.java | 46 +------------------
.../kafka09/KafkaPartitionLevelConsumerTest.java | 2 +-
.../stream/kafka20/KafkaConsumerFactory.java | 7 ---
.../kafka20/KafkaStreamMetadataProvider.java | 52 ----------------------
.../spi/stream/PartitionGroupInfoFetcher.java | 4 +-
.../pinot/spi/stream/StreamConsumerFactory.java | 4 +-
.../pinot/spi/stream/StreamMetadataProvider.java | 36 ++++++++++++---
12 files changed, 43 insertions(+), 142 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 3d84738..7d82b4e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -1240,7 +1240,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
int numPartitions = columnPartitionConfig.getNumPartitions();
try {
// fixme: get this from ideal state
- int numStreamPartitions = _streamMetadataProvider.getPartitionGroupInfoList(Collections.emptyList(), 5000).size();
+ int numStreamPartitions = _streamMetadataProvider
+ .getPartitionGroupInfoList(_clientId, _partitionLevelStreamConfig, Collections.emptyList(), 5000).size();
if (numStreamPartitions != numPartitions) {
segmentLogger.warn(
"Number of stream partitions: {} does not match number of partitions in the partition config: {}, using number of stream partitions",
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
index fbeb808..b0dc7eb 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
@@ -69,12 +69,6 @@ public class FakeStreamConsumerFactory extends StreamConsumerFactory {
return new FakeStreamMetadataProvider(_streamConfig);
}
-
- @Override
- public PartitionGroupConsumer createPartitionGroupConsumer(String clientId, PartitionGroupMetadata metadata) {
- return null;
- }
-
public static void main(String[] args)
throws Exception {
String clientId = "client_id_localhost_tester";
@@ -88,7 +82,7 @@ public class FakeStreamConsumerFactory extends StreamConsumerFactory {
// stream metadata provider
StreamMetadataProvider streamMetadataProvider = streamConsumerFactory.createStreamMetadataProvider(clientId);
- int partitionCount = streamMetadataProvider.getPartitionGroupInfoList(Collections.emptyList(), 10_000).size();
+ int partitionCount = streamMetadataProvider.getPartitionGroupInfoList("clientId", streamConfig, Collections.emptyList(), 10_000).size();
System.out.println(partitionCount);
// Partition metadata provider
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMetadataProvider.java b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMetadataProvider.java
index 0de0ce2..61aa01f 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMetadataProvider.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMetadataProvider.java
@@ -48,17 +48,6 @@ public class FakeStreamMetadataProvider implements StreamMetadataProvider {
return _numPartitions;
}
- @Override
- public List<PartitionGroupInfo> getPartitionGroupInfoList(
- List<PartitionGroupMetadata> currentPartitionGroupsMetadata, long timeoutMillis)
- throws TimeoutException {
- List<PartitionGroupInfo> partitionGroupMetadataList = new ArrayList<>();
- for (int i = 0; i < _numPartitions; i++) {
- partitionGroupMetadataList.add(new PartitionGroupInfo(i, fetchStreamPartitionOffset(_streamConfig.getOffsetCriteria(), 5000).toString()));
- }
- return partitionGroupMetadataList;
- }
-
public long fetchPartitionOffset(@Nonnull OffsetCriteria offsetCriteria, long timeoutMillis) throws TimeoutException {
throw new UnsupportedOperationException("This method is deprecated");
}
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java
index c7523e3..4503de0 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java
@@ -119,10 +119,5 @@ public class FlakyConsumerRealtimeClusterIntegrationTest extends RealtimeCluster
public StreamMetadataProvider createStreamMetadataProvider(String clientId) {
throw new UnsupportedOperationException();
}
-
- @Override
- public PartitionGroupConsumer createPartitionGroupConsumer(String clientId, PartitionGroupMetadata metadata) {
- return null;
- }
}
}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaConsumerFactory.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaConsumerFactory.java
index fe5a461..615e354 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaConsumerFactory.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaConsumerFactory.java
@@ -19,8 +19,6 @@
package org.apache.pinot.plugin.stream.kafka09;
import java.util.Set;
-import org.apache.pinot.spi.stream.PartitionGroupConsumer;
-import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import org.apache.pinot.spi.stream.PartitionLevelConsumer;
import org.apache.pinot.spi.stream.StreamConsumerFactory;
import org.apache.pinot.spi.stream.StreamLevelConsumer;
@@ -52,9 +50,4 @@ public class KafkaConsumerFactory extends StreamConsumerFactory {
public StreamMetadataProvider createStreamMetadataProvider(String clientId) {
return new KafkaStreamMetadataProvider(clientId, _streamConfig);
}
-
- @Override
- public PartitionGroupConsumer createPartitionGroupConsumer(String clientId, PartitionGroupMetadata metadata) {
- return null;
- }
}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaStreamMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaStreamMetadataProvider.java
index 2d0ad31..06ee697 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaStreamMetadataProvider.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaStreamMetadataProvider.java
@@ -22,13 +22,9 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Uninterruptibles;
import java.io.IOException;
-import java.time.Duration;
-import java.util.ArrayList;
import java.util.Collections;
-import java.util.List;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.TopicAndPartition;
import kafka.javaapi.OffsetRequest;
@@ -40,8 +36,6 @@ import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.protocol.Errors;
import org.apache.pinot.spi.stream.LongMsgOffset;
import org.apache.pinot.spi.stream.OffsetCriteria;
-import org.apache.pinot.spi.stream.PartitionGroupInfo;
-import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamMetadataProvider;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
@@ -55,14 +49,13 @@ import org.slf4j.LoggerFactory;
public class KafkaStreamMetadataProvider extends KafkaConnectionHandler implements StreamMetadataProvider {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaStreamMetadataProvider.class);
- private StreamConfig _streamConfig;
-
/**
* Create a partition specific metadata provider
+ * @param streamConfig
+ * @param partition
*/
public KafkaStreamMetadataProvider(String clientId, StreamConfig streamConfig, int partition) {
super(clientId, streamConfig, partition, new KafkaSimpleConsumerFactoryImpl());
- _streamConfig = streamConfig;
}
/**
@@ -71,21 +64,18 @@ public class KafkaStreamMetadataProvider extends KafkaConnectionHandler implemen
*/
public KafkaStreamMetadataProvider(String clientId, StreamConfig streamConfig) {
super(clientId, streamConfig, new KafkaSimpleConsumerFactoryImpl());
- _streamConfig = streamConfig;
}
@VisibleForTesting
public KafkaStreamMetadataProvider(String clientId, StreamConfig streamConfig, int partition,
KafkaSimpleConsumerFactory kafkaSimpleConsumerFactory) {
super(clientId, streamConfig, partition, kafkaSimpleConsumerFactory);
- _streamConfig = streamConfig;
}
@VisibleForTesting
public KafkaStreamMetadataProvider(String clientId, StreamConfig streamConfig,
KafkaSimpleConsumerFactory kafkaSimpleConsumerFactory) {
super(clientId, streamConfig, kafkaSimpleConsumerFactory);
- _streamConfig = streamConfig;
}
/**
@@ -94,12 +84,7 @@ public class KafkaStreamMetadataProvider extends KafkaConnectionHandler implemen
* @return
*/
@Override
- @Deprecated
public synchronized int fetchPartitionCount(long timeoutMillis) {
- return fetchPartitionCountInternal(timeoutMillis);
- }
-
- private int fetchPartitionCountInternal(long timeoutMillis) {
int unknownTopicReplyCount = 0;
final int MAX_UNKNOWN_TOPIC_REPLY_COUNT = 10;
int kafkaErrorCount = 0;
@@ -160,33 +145,6 @@ public class KafkaStreamMetadataProvider extends KafkaConnectionHandler implemen
throw new TimeoutException();
}
- /**
- * Fetch the partitionGroupMetadata list.
- * @param currentPartitionGroupsMetadata In case of Kafka, each partition group contains a single partition.
- */
- @Override
- public List<PartitionGroupInfo> getPartitionGroupInfoList(
- List<PartitionGroupMetadata> currentPartitionGroupsMetadata, long timeoutMillis)
- throws java.util.concurrent.TimeoutException {
- int partitionCount = fetchPartitionCountInternal(timeoutMillis);
- List<PartitionGroupInfo> newPartitionGroupInfoList = new ArrayList<>(partitionCount);
-
- // add a PartitionGroupInfo into the list foreach partition already present in current.
- // the end checkpoint is set as checkpoint
- for (PartitionGroupMetadata currentPartitionGroupMetadata : currentPartitionGroupsMetadata) {
- newPartitionGroupInfoList.add(new PartitionGroupInfo(currentPartitionGroupMetadata.getPartitionGroupId(),
- currentPartitionGroupMetadata.getEndCheckpoint()));
- }
- // add PartitiongroupInfo for new partitions
- // use offset criteria from stream config
- for (int i = currentPartitionGroupsMetadata.size(); i < partitionCount; i++) {
- StreamPartitionMsgOffset streamPartitionMsgOffset =
- fetchStreamPartitionOffset(_streamConfig.getOffsetCriteria(), 5000);
- newPartitionGroupInfoList.add(new PartitionGroupInfo(i, streamPartitionMsgOffset.toString()));
- }
- return newPartitionGroupInfoList;
- }
-
public synchronized long fetchPartitionOffset(@Nonnull OffsetCriteria offsetCriteria, long timeoutMillis)
throws java.util.concurrent.TimeoutException {
throw new UnsupportedOperationException("The use of this method s not supported");
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/test/java/org/apache/pinot/plugin/stream/kafka09/KafkaPartitionLevelConsumerTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/test/java/org/apache/pinot/plugin/stream/kafka09/KafkaPartitionLevelConsumerTest.java
index 9d3091e..90dc5ad 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/test/java/org/apache/pinot/plugin/stream/kafka09/KafkaPartitionLevelConsumerTest.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/test/java/org/apache/pinot/plugin/stream/kafka09/KafkaPartitionLevelConsumerTest.java
@@ -291,7 +291,7 @@ public class KafkaPartitionLevelConsumerTest {
KafkaStreamMetadataProvider streamMetadataProvider =
new KafkaStreamMetadataProvider(clientId, streamConfig, mockKafkaSimpleConsumerFactory);
- Assert.assertEquals(streamMetadataProvider.getPartitionGroupInfoList(Collections.emptyList(), 10000L), 2);
+ Assert.assertEquals(streamMetadataProvider.getPartitionGroupInfoList("clientId", streamConfig, Collections.emptyList(), 10000), 2);
}
@Test
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java
index b6746ff..e0d1015 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java
@@ -19,8 +19,6 @@
package org.apache.pinot.plugin.stream.kafka20;
import java.util.Set;
-import org.apache.pinot.spi.stream.PartitionGroupConsumer;
-import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import org.apache.pinot.spi.stream.PartitionLevelConsumer;
import org.apache.pinot.spi.stream.StreamConsumerFactory;
import org.apache.pinot.spi.stream.StreamLevelConsumer;
@@ -49,9 +47,4 @@ public class KafkaConsumerFactory extends StreamConsumerFactory {
public StreamMetadataProvider createStreamMetadataProvider(String clientId) {
return new KafkaStreamMetadataProvider(clientId, _streamConfig);
}
-
- @Override
- public PartitionGroupConsumer createPartitionGroupConsumer(String clientId, PartitionGroupMetadata metadata) {
- return new KafkaPartitionLevelConsumer(clientId, _streamConfig, metadata.getPartitionGroupId());
- }
}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
index 1d3162a..38c49f5 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
@@ -21,17 +21,11 @@ package org.apache.pinot.plugin.stream.kafka20;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.time.Duration;
-import java.util.ArrayList;
import java.util.Collections;
-import java.util.List;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-import org.apache.kafka.common.TopicPartition;
import org.apache.pinot.spi.stream.LongMsgOffset;
import org.apache.pinot.spi.stream.OffsetCriteria;
-import org.apache.pinot.spi.stream.PartitionGroupInfo;
-import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamMetadataProvider;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
@@ -39,15 +33,12 @@ import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
public class KafkaStreamMetadataProvider extends KafkaPartitionLevelConnectionHandler implements StreamMetadataProvider {
- private StreamConfig _streamConfig;
-
public KafkaStreamMetadataProvider(String clientId, StreamConfig streamConfig) {
this(clientId, streamConfig, Integer.MIN_VALUE);
}
public KafkaStreamMetadataProvider(String clientId, StreamConfig streamConfig, int partition) {
super(clientId, streamConfig, partition);
- _streamConfig = streamConfig;
}
@Override
@@ -56,33 +47,6 @@ public class KafkaStreamMetadataProvider extends KafkaPartitionLevelConnectionHa
return _consumer.partitionsFor(_topic, Duration.ofMillis(timeoutMillis)).size();
}
- /**
- * Fetch the partitionGroupMetadata list.
- * @param currentPartitionGroupsMetadata In case of Kafka, each partition group contains a single partition.
- */
- @Override
- public List<PartitionGroupInfo> getPartitionGroupInfoList(
- List<PartitionGroupMetadata> currentPartitionGroupsMetadata, long timeoutMillis)
- throws TimeoutException {
- int partitionCount = _consumer.partitionsFor(_topic, Duration.ofMillis(timeoutMillis)).size();
- List<PartitionGroupInfo> newPartitionGroupInfoList = new ArrayList<>(partitionCount);
-
- // add a PartitionGroupInfo into the list foreach partition already present in current.
- // the end checkpoint is set as checkpoint
- for (PartitionGroupMetadata currentPartitionGroupMetadata : currentPartitionGroupsMetadata) {
- newPartitionGroupInfoList.add(new PartitionGroupInfo(currentPartitionGroupMetadata.getPartitionGroupId(),
- currentPartitionGroupMetadata.getEndCheckpoint()));
- }
- // add PartitiongroupInfo for new partitions
- // use offset criteria from stream config
- for (int i = currentPartitionGroupsMetadata.size(); i < partitionCount; i++) {
- StreamPartitionMsgOffset streamPartitionMsgOffset =
- fetchStreamPartitionOffsetInternal(i, _streamConfig.getOffsetCriteria(), 5000);
- newPartitionGroupInfoList.add(new PartitionGroupInfo(i, streamPartitionMsgOffset.toString()));
- }
- return newPartitionGroupInfoList;
- }
-
public synchronized long fetchPartitionOffset(@Nonnull OffsetCriteria offsetCriteria, long timeoutMillis)
throws java.util.concurrent.TimeoutException {
throw new UnsupportedOperationException("The use of this method is not supported");
@@ -105,22 +69,6 @@ public class KafkaStreamMetadataProvider extends KafkaPartitionLevelConnectionHa
return new LongMsgOffset(offset);
}
- private StreamPartitionMsgOffset fetchStreamPartitionOffsetInternal(int partitionId, @Nonnull OffsetCriteria offsetCriteria, long timeoutMillis) {
- Preconditions.checkNotNull(offsetCriteria);
- TopicPartition topicPartition = new TopicPartition(_topic, partitionId);
- long offset = -1;
- if (offsetCriteria.isLargest()) {
- offset = _consumer.endOffsets(Collections.singletonList(topicPartition), Duration.ofMillis(timeoutMillis))
- .get(topicPartition);
- } else if (offsetCriteria.isSmallest()) {
- offset = _consumer.beginningOffsets(Collections.singletonList(topicPartition), Duration.ofMillis(timeoutMillis))
- .get(topicPartition);
- } else {
- throw new IllegalArgumentException("Unknown initial offset value " + offsetCriteria.toString());
- }
- return new LongMsgOffset(offset);
- }
-
@Override
public void close()
throws IOException {
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupInfoFetcher.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupInfoFetcher.java
index d13be10..f2d3f17 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupInfoFetcher.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupInfoFetcher.java
@@ -32,6 +32,7 @@ public class PartitionGroupInfoFetcher implements Callable<Boolean> {
private static final Logger LOGGER = LoggerFactory.getLogger(PartitionGroupInfoFetcher.class);
private List<PartitionGroupInfo> _partitionGroupInfoList;
+ private final StreamConfig _streamConfig;
private final List<PartitionGroupMetadata> _currentPartitionGroupMetadata;
private final StreamConsumerFactory _streamConsumerFactory;
private Exception _exception;
@@ -40,6 +41,7 @@ public class PartitionGroupInfoFetcher implements Callable<Boolean> {
public PartitionGroupInfoFetcher(StreamConfig streamConfig, List<PartitionGroupMetadata> currentPartitionGroupMetadataList) {
_streamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig);
_topicName = streamConfig.getTopicName();
+ _streamConfig = streamConfig;
_currentPartitionGroupMetadata = currentPartitionGroupMetadataList;
}
@@ -61,7 +63,7 @@ public class PartitionGroupInfoFetcher implements Callable<Boolean> {
String clientId = PartitionGroupInfoFetcher.class.getSimpleName() + "-" + _topicName;
try (
StreamMetadataProvider streamMetadataProvider = _streamConsumerFactory.createStreamMetadataProvider(clientId)) {
- _partitionGroupInfoList = streamMetadataProvider.getPartitionGroupInfoList(_currentPartitionGroupMetadata, /*maxWaitTimeMs=*/5000L);
+ _partitionGroupInfoList = streamMetadataProvider.getPartitionGroupInfoList(clientId, _streamConfig, _currentPartitionGroupMetadata, /*maxWaitTimeMs=*/5000);
if (_exception != null) {
// We had at least one failure, but succeeded now. Log an info
LOGGER.info("Successfully retrieved partition group info for topic {}", _topicName);
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
index db48a83..f993fed 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
@@ -75,5 +75,7 @@ public abstract class StreamConsumerFactory {
}
// creates a consumer which consumes from a partition group
- public abstract PartitionGroupConsumer createPartitionGroupConsumer(String clientId, PartitionGroupMetadata metadata);
+ public PartitionGroupConsumer createPartitionGroupConsumer(String clientId, PartitionGroupMetadata metadata) {
+ return createPartitionLevelConsumer(clientId, metadata.getPartitionGroupId());
+ }
}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
index f595ea3..572cd02 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
@@ -19,6 +19,8 @@
package org.apache.pinot.spi.stream;
import java.io.Closeable;
+import java.time.Duration;
+import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nonnull;
@@ -39,11 +41,6 @@ public interface StreamMetadataProvider extends Closeable {
@Deprecated
int fetchPartitionCount(long timeoutMillis);
- // takes the current state of partition groups (groupings of shards, the state of the consumption) and creates the new state
- List<PartitionGroupInfo> getPartitionGroupInfoList(
- List<PartitionGroupMetadata> currentPartitionGroupsMetadata, long timeoutMillis)
- throws TimeoutException;
-
// Issue 5953 Retain this interface for 0.5.0, remove in 0.6.0
@Deprecated
long fetchPartitionOffset(@Nonnull OffsetCriteria offsetCriteria, long timeoutMillis)
@@ -60,4 +57,33 @@ public interface StreamMetadataProvider extends Closeable {
long offset = fetchPartitionOffset(offsetCriteria, timeoutMillis);
return new LongMsgOffset(offset);
}
+
+ /**
+ * Fetch the partitionGroupMetadata list.
+ * @param currentPartitionGroupsMetadata In case of Kafka, each partition group contains a single partition.
+ */
+ default List<PartitionGroupInfo> getPartitionGroupInfoList(String clientId, StreamConfig streamConfig,
+ List<PartitionGroupMetadata> currentPartitionGroupsMetadata, int timeoutMillis)
+ throws TimeoutException {
+ int partitionCount = fetchPartitionCount(timeoutMillis);
+ List<PartitionGroupInfo> newPartitionGroupInfoList = new ArrayList<>(partitionCount);
+
+ // add a PartitionGroupInfo into the list foreach partition already present in current.
+ // the end checkpoint is set as checkpoint
+ for (PartitionGroupMetadata currentPartitionGroupMetadata : currentPartitionGroupsMetadata) {
+ newPartitionGroupInfoList.add(new PartitionGroupInfo(currentPartitionGroupMetadata.getPartitionGroupId(),
+ currentPartitionGroupMetadata.getEndCheckpoint()));
+ }
+ // add PartitiongroupInfo for new partitions
+ // use offset criteria from stream config
+ StreamConsumerFactory streamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig);
+ for (int i = currentPartitionGroupsMetadata.size(); i < partitionCount; i++) {
+ StreamMetadataProvider partitionMetadataProvider =
+ streamConsumerFactory.createPartitionMetadataProvider(clientId, i);
+ StreamPartitionMsgOffset streamPartitionMsgOffset =
+ partitionMetadataProvider.fetchStreamPartitionOffset(streamConfig.getOffsetCriteria(), timeoutMillis);
+ newPartitionGroupInfoList.add(new PartitionGroupInfo(i, streamPartitionMsgOffset.toString()));
+ }
+ return newPartitionGroupInfoList;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org