You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2021/01/02 23:52:23 UTC
[incubator-pinot] 06/08: More controller side changes
This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch sharded_consumer_type_support
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 1cb09cf59d2b4dd9b6ff79fdbe48775ae532ea60
Author: Neha Pawar <ne...@gmail.com>
AuthorDate: Thu Dec 31 15:39:44 2020 -0800
More controller side changes
---
.../helix/core/PinotHelixResourceManager.java | 4 +-
.../helix/core/PinotTableIdealStateBuilder.java | 16 +++---
.../realtime/PinotLLCRealtimeSegmentManager.java | 57 ++++++++--------------
.../PinotLLCRealtimeSegmentManagerTest.java | 4 +-
.../realtime/LLRealtimeSegmentDataManager.java | 2 +-
.../impl/fakestream/FakeStreamConsumerFactory.java | 2 +-
.../fakestream/FakeStreamMetadataProvider.java | 12 +++--
.../kafka09/KafkaStreamMetadataProvider.java | 36 ++++++++++----
.../kafka09/KafkaPartitionLevelConsumerTest.java | 4 +-
.../kafka20/KafkaStreamMetadataProvider.java | 3 +-
...Fetcher.java => PartitionGroupInfoFetcher.java} | 31 +++++-------
.../pinot/spi/stream/StreamMetadataProvider.java | 2 +-
12 files changed, 86 insertions(+), 87 deletions(-)
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index ebfbfa1..e80c06b 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -1337,7 +1337,7 @@ public class PinotHelixResourceManager {
idealState = PinotTableIdealStateBuilder
.buildLowLevelRealtimeIdealStateFor(realtimeTableName, realtimeTableConfig, idealState,
_enableBatchMessageMode);
- _pinotLLCRealtimeSegmentManager.setupNewShardedTable(realtimeTableConfig, idealState);
+ _pinotLLCRealtimeSegmentManager.setupNewTable(realtimeTableConfig, idealState);
LOGGER.info("Successfully setup table for SHARDED consumers for {} ", realtimeTableName);
} else {
@@ -1366,7 +1366,7 @@ public class PinotHelixResourceManager {
idealState = PinotTableIdealStateBuilder
.buildLowLevelRealtimeIdealStateFor(realtimeTableName, realtimeTableConfig, idealState,
_enableBatchMessageMode);
- _pinotLLCRealtimeSegmentManager.setupNewShardedTable(realtimeTableConfig, idealState);
+ _pinotLLCRealtimeSegmentManager.setupNewTable(realtimeTableConfig, idealState);
LOGGER.info("Successfully added Helix entries for low-level consumers for {} ", realtimeTableName);
} else {
LOGGER.info("LLC is already set up for table {}, not configuring again", realtimeTableName);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
index a7b3c9e..8b200bb 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
@@ -30,10 +30,10 @@ import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
import org.apache.pinot.common.utils.StringUtil;
import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.common.utils.helix.HelixHelper;
-import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.stream.PartitionGroupInfo;
import org.apache.pinot.spi.stream.PartitionGroupMetadata;
-import org.apache.pinot.spi.stream.PartitionGroupMetadataFetcher;
+import org.apache.pinot.spi.stream.PartitionGroupInfoFetcher;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.apache.pinot.spi.utils.retry.RetryPolicies;
@@ -115,15 +115,15 @@ public class PinotTableIdealStateBuilder {
return idealState;
}
- public static List<PartitionGroupMetadata> getPartitionGroupMetadataList(StreamConfig streamConfig,
+ public static List<PartitionGroupInfo> getPartitionGroupInfoList(StreamConfig streamConfig,
List<PartitionGroupMetadata> currentPartitionGroupMetadataList) {
- PartitionGroupMetadataFetcher partitionGroupMetadataFetcher =
- new PartitionGroupMetadataFetcher(streamConfig, currentPartitionGroupMetadataList);
+ PartitionGroupInfoFetcher partitionGroupInfoFetcher =
+ new PartitionGroupInfoFetcher(streamConfig, currentPartitionGroupMetadataList);
try {
- RetryPolicies.noDelayRetryPolicy(3).attempt(partitionGroupMetadataFetcher);
- return partitionGroupMetadataFetcher.getPartitionGroupMetadataList();
+ RetryPolicies.noDelayRetryPolicy(3).attempt(partitionGroupInfoFetcher);
+ return partitionGroupInfoFetcher.getPartitionGroupInfoList();
} catch (Exception e) {
- Exception fetcherException = partitionGroupMetadataFetcher.getException();
+ Exception fetcherException = partitionGroupInfoFetcher.getException();
LOGGER.error("Could not get partition count for {}", streamConfig.getTopicName(), fetcherException);
throw new RuntimeException(fetcherException);
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 423a0b2..d899b4c 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -48,7 +48,6 @@ import org.apache.pinot.common.metadata.segment.ColumnPartitionMetadata;
import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
-import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
@@ -208,7 +207,7 @@ public class PinotLLCRealtimeSegmentManager {
/**
* Sets up the realtime table ideal state for a table of consumer type SHARDED
*/
- public void setupNewShardedTable(TableConfig tableConfig, IdealState idealState) {
+ public void setupNewTable(TableConfig tableConfig, IdealState idealState) {
Preconditions.checkState(!_isStopping, "Segment manager is stopping");
String realtimeTableName = tableConfig.getTableName();
@@ -220,18 +219,8 @@ public class PinotLLCRealtimeSegmentManager {
new PartitionLevelStreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig));
// get new partition groups and their metadata
- StreamConsumerFactory streamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig);
- StreamMetadataProvider streamMetadataProvider = streamConsumerFactory
- .createStreamMetadataProvider(streamConfig.getTopicName() + "_" + System.currentTimeMillis());
-
- List<PartitionGroupInfo> newPartitionGroupMetadataList;
- try {
- newPartitionGroupMetadataList =
- streamMetadataProvider.getPartitionGroupMetadataList(Collections.emptyList(), 5000);
- } catch (TimeoutException e) {
- throw new IllegalStateException(e);
- }
- int numPartitionGroups = newPartitionGroupMetadataList.size();
+ List<PartitionGroupInfo> newPartitionGroupInfoList = getPartitionGroupInfoList(streamConfig, Collections.emptyList());
+ int numPartitionGroups = newPartitionGroupInfoList.size();
InstancePartitions instancePartitions = getConsumingInstancePartitions(tableConfig);
int numReplicas = getNumReplicas(tableConfig, instancePartitions);
@@ -242,7 +231,7 @@ public class PinotLLCRealtimeSegmentManager {
long currentTimeMs = getCurrentTimeMs();
Map<String, Map<String, String>> instanceStatesMap = idealState.getRecord().getMapFields();
- for (PartitionGroupInfo partitionGroupInfo : newPartitionGroupMetadataList) {
+ for (PartitionGroupInfo partitionGroupInfo : newPartitionGroupInfoList) {
String segmentName = setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupInfo,
currentTimeMs, instancePartitions, numPartitionGroups, numReplicas);
updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, segmentName, segmentAssignment,
@@ -507,18 +496,10 @@ public class PinotLLCRealtimeSegmentManager {
List<PartitionGroupMetadata> currentPartitionGroupMetadataList = getCurrentPartitionGroupMetadataList(idealState);
PartitionLevelStreamConfig streamConfig = new PartitionLevelStreamConfig(tableConfig.getTableName(),
IngestionConfigUtils.getStreamConfigMap(tableConfig));
- StreamConsumerFactory streamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig);
- StreamMetadataProvider streamMetadataProvider = streamConsumerFactory
- .createStreamMetadataProvider(streamConfig.getTopicName() + " " + System.currentTimeMillis());
// find new partition groups [A],[B],[C],[D]
- List<PartitionGroupInfo> newPartitionGroupMetadataList;
- try {
- newPartitionGroupMetadataList =
- streamMetadataProvider.getPartitionGroupMetadataList(currentPartitionGroupMetadataList, 1000);
- } catch (TimeoutException e) {
- throw new IllegalStateException(e);
- }
+ List<PartitionGroupInfo> newPartitionGroupMetadataList =
+ getPartitionGroupInfoList(streamConfig, currentPartitionGroupMetadataList);
// create new segment metadata, only if it is not IN_PROGRESS in the current state
Map<Integer, PartitionGroupMetadata> currentGroupIdToMetadata = currentPartitionGroupMetadataList.stream().collect(
@@ -721,9 +702,9 @@ public class PinotLLCRealtimeSegmentManager {
}
@VisibleForTesting
- List<PartitionGroupMetadata> getPartitionGroupMetadataList(StreamConfig streamConfig,
+ List<PartitionGroupInfo> getPartitionGroupInfoList(StreamConfig streamConfig,
List<PartitionGroupMetadata> currentPartitionGroupMetadataList) {
- return PinotTableIdealStateBuilder.getPartitionGroupMetadataList(streamConfig, currentPartitionGroupMetadataList);
+ return PinotTableIdealStateBuilder.getPartitionGroupInfoList(streamConfig, currentPartitionGroupMetadataList);
}
@VisibleForTesting
@@ -843,7 +824,7 @@ public class PinotLLCRealtimeSegmentManager {
if (idealState.isEnabled()) {
List<PartitionGroupMetadata> currentPartitionGroupMetadataList =
getCurrentPartitionGroupMetadataList(idealState);
- int numPartitions = getPartitionGroupMetadataList(streamConfig, currentPartitionGroupMetadataList).size();
+ int numPartitions = getPartitionGroupInfoList(streamConfig, currentPartitionGroupMetadataList).size();
return ensureAllPartitionsConsuming(tableConfig, streamConfig, idealState, numPartitions);
} else {
LOGGER.info("Skipping LLC segments validation for disabled table: {}", realtimeTableName);
@@ -1005,7 +986,7 @@ public class PinotLLCRealtimeSegmentManager {
// and restart consumption from the same offset (if possible) or a newer offset (if realtime stream does not have the same offset).
// In latter case, report data loss.
for (Map.Entry<Integer, LLCRealtimeSegmentZKMetadata> entry : latestSegmentZKMetadataMap.entrySet()) {
- int partitionId = entry.getKey();
+ int partitionGroupId = entry.getKey();
LLCRealtimeSegmentZKMetadata latestSegmentZKMetadata = entry.getValue();
String latestSegmentName = latestSegmentZKMetadata.getSegmentName();
LLCSegmentName latestLLCSegmentName = new LLCSegmentName(latestSegmentName);
@@ -1049,10 +1030,10 @@ public class PinotLLCRealtimeSegmentManager {
StreamPartitionMsgOffset startOffset = offsetFactory.create(latestSegmentZKMetadata.getStartOffset());
// Start offset must be higher than the start offset of the stream
StreamPartitionMsgOffset partitionStartOffset =
- getPartitionOffset(streamConfig, OffsetCriteria.SMALLEST_OFFSET_CRITERIA, partitionId);
+ getPartitionOffset(streamConfig, OffsetCriteria.SMALLEST_OFFSET_CRITERIA, partitionGroupId);
if (partitionStartOffset.compareTo(startOffset) > 0) {
LOGGER.error("Data lost from offset: {} to: {} for partition: {} of table: {}", startOffset,
- partitionStartOffset, partitionId, realtimeTableName);
+ partitionStartOffset, partitionGroupId, realtimeTableName);
_controllerMetrics.addMeteredTableValue(realtimeTableName, ControllerMeter.LLC_STREAM_DATA_LOSS, 1L);
startOffset = partitionStartOffset;
}
@@ -1089,7 +1070,7 @@ public class PinotLLCRealtimeSegmentManager {
String previousConsumingSegment = null;
for (Map.Entry<String, Map<String, String>> segmentEntry : instanceStatesMap.entrySet()) {
LLCSegmentName llcSegmentName = new LLCSegmentName(segmentEntry.getKey());
- if (llcSegmentName.getPartitionGroupId() == partitionId && segmentEntry.getValue()
+ if (llcSegmentName.getPartitionGroupId() == partitionGroupId && segmentEntry.getValue()
.containsValue(SegmentStateModel.CONSUMING)) {
previousConsumingSegment = llcSegmentName.getSegmentName();
break;
@@ -1098,7 +1079,7 @@ public class PinotLLCRealtimeSegmentManager {
if (previousConsumingSegment == null) {
LOGGER
.error("Failed to find previous CONSUMING segment for partition: {} of table: {}, potential data loss",
- partitionId, realtimeTableName);
+ partitionGroupId, realtimeTableName);
_controllerMetrics.addMeteredTableValue(realtimeTableName, ControllerMeter.LLC_STREAM_DATA_LOSS, 1L);
}
updateInstanceStatesForNewConsumingSegment(instanceStatesMap, previousConsumingSegment, latestSegmentName,
@@ -1111,10 +1092,14 @@ public class PinotLLCRealtimeSegmentManager {
}
// Set up new partitions if not exist
- for (int partitionId = 0; partitionId < numPartitions; partitionId++) {
- if (!latestSegmentZKMetadataMap.containsKey(partitionId)) {
+ List<PartitionGroupMetadata> currentPartitionGroupMetadataList = getCurrentPartitionGroupMetadataList(idealState);
+ List<PartitionGroupInfo> partitionGroupInfoList =
+ getPartitionGroupInfoList(streamConfig, currentPartitionGroupMetadataList);
+ for (PartitionGroupInfo partitionGroupInfo : partitionGroupInfoList) {
+ int partitionGroupId = partitionGroupInfo.getPartitionGroupId();
+ if (!latestSegmentZKMetadataMap.containsKey(partitionGroupId)) {
String newSegmentName =
- setupNewPartitionGroup(tableConfig, streamConfig, partitionId, currentTimeMs, instancePartitions, numPartitions,
+ setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupInfo, currentTimeMs, instancePartitions, numPartitions,
numReplicas);
updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, newSegmentName, segmentAssignment,
instancePartitionsMap);
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index 743e719..42bdedc 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -50,7 +50,6 @@ import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignme
import org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
import org.apache.pinot.controller.util.SegmentCompletionUtils;
import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
-import org.apache.pinot.core.realtime.impl.fakestream.FakePartitionGroupMetadata;
import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.spi.config.table.TableConfig;
@@ -60,6 +59,7 @@ import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
import org.apache.pinot.spi.stream.LongMsgOffset;
import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pinot.spi.stream.PartitionGroupInfo;
import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
import org.apache.pinot.spi.stream.StreamConfig;
@@ -914,7 +914,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
}
@Override
- List<PartitionGroupMetadata> getPartitionGroupMetadataList(StreamConfig streamConfig, List<PartitionGroupMetadata> currentPartitionGroupMetadataList) {
+ List<PartitionGroupInfo> getPartitionGroupInfoList(StreamConfig streamConfig, List<PartitionGroupMetadata> currentPartitionGroupMetadataList) {
return IntStream.range(0, _numPartitions).mapToObj(FakePartitionGroupMetadata::new).collect(Collectors.toList());
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index f00facc..0cd1fba 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -1239,7 +1239,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
int numPartitions = columnPartitionConfig.getNumPartitions();
try {
// fixme: get this from ideal state
- int numStreamPartitions = _streamMetadataProvider.getPartitionGroupMetadataList(Collections.emptyList(), 5000).size();
+ int numStreamPartitions = _streamMetadataProvider.getPartitionGroupInfoList(Collections.emptyList(), 5000).size();
if (numStreamPartitions != numPartitions) {
segmentLogger.warn(
"Number of stream partitions: {} does not match number of partitions in the partition config: {}, using number of stream partitions",
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
index 54be1b6..6121eef 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
@@ -88,7 +88,7 @@ public class FakeStreamConsumerFactory extends StreamConsumerFactory {
// stream metadata provider
StreamMetadataProvider streamMetadataProvider = streamConsumerFactory.createStreamMetadataProvider(clientId);
- int partitionCount = streamMetadataProvider.getPartitionGroupMetadataList(Collections.emptyList(), 10_000).size();
+ int partitionCount = streamMetadataProvider.getPartitionGroupInfoList(Collections.emptyList(), 10_000).size();
System.out.println(partitionCount);
// Partition metadata provider
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMetadataProvider.java b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMetadataProvider.java
index c96d06a..0de0ce2 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMetadataProvider.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMetadataProvider.java
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nonnull;
import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pinot.spi.stream.PartitionGroupInfo;
import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamMetadataProvider;
@@ -35,9 +36,11 @@ import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
*/
public class FakeStreamMetadataProvider implements StreamMetadataProvider {
private final int _numPartitions;
+ private StreamConfig _streamConfig;
public FakeStreamMetadataProvider(StreamConfig streamConfig) {
_numPartitions = FakeStreamConfigUtils.getNumPartitions(streamConfig);
+ _streamConfig = streamConfig;
}
@Override
@@ -46,11 +49,12 @@ public class FakeStreamMetadataProvider implements StreamMetadataProvider {
}
@Override
- public List<PartitionGroupMetadata> getPartitionGroupMetadataList(
- List<PartitionGroupMetadata> currentPartitionGroupsMetadata, long timeoutMillis) {
- List<PartitionGroupMetadata> partitionGroupMetadataList = new ArrayList<>();
+ public List<PartitionGroupInfo> getPartitionGroupInfoList(
+ List<PartitionGroupMetadata> currentPartitionGroupsMetadata, long timeoutMillis)
+ throws TimeoutException {
+ List<PartitionGroupInfo> partitionGroupMetadataList = new ArrayList<>();
for (int i = 0; i < _numPartitions; i++) {
- partitionGroupMetadataList.add(new FakePartitionGroupMetadata(i));
+ partitionGroupMetadataList.add(new PartitionGroupInfo(i, fetchStreamPartitionOffset(_streamConfig.getOffsetCriteria(), 5000).toString()));
}
return partitionGroupMetadataList;
}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaStreamMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaStreamMetadataProvider.java
index 865ae96..2d0ad31 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaStreamMetadataProvider.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaStreamMetadataProvider.java
@@ -40,6 +40,7 @@ import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.protocol.Errors;
import org.apache.pinot.spi.stream.LongMsgOffset;
import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pinot.spi.stream.PartitionGroupInfo;
import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamMetadataProvider;
@@ -54,13 +55,14 @@ import org.slf4j.LoggerFactory;
public class KafkaStreamMetadataProvider extends KafkaConnectionHandler implements StreamMetadataProvider {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaStreamMetadataProvider.class);
+ private StreamConfig _streamConfig;
+
/**
* Create a partition specific metadata provider
- * @param streamConfig
- * @param partition
*/
public KafkaStreamMetadataProvider(String clientId, StreamConfig streamConfig, int partition) {
super(clientId, streamConfig, partition, new KafkaSimpleConsumerFactoryImpl());
+ _streamConfig = streamConfig;
}
/**
@@ -69,18 +71,21 @@ public class KafkaStreamMetadataProvider extends KafkaConnectionHandler implemen
*/
public KafkaStreamMetadataProvider(String clientId, StreamConfig streamConfig) {
super(clientId, streamConfig, new KafkaSimpleConsumerFactoryImpl());
+ _streamConfig = streamConfig;
}
@VisibleForTesting
public KafkaStreamMetadataProvider(String clientId, StreamConfig streamConfig, int partition,
KafkaSimpleConsumerFactory kafkaSimpleConsumerFactory) {
super(clientId, streamConfig, partition, kafkaSimpleConsumerFactory);
+ _streamConfig = streamConfig;
}
@VisibleForTesting
public KafkaStreamMetadataProvider(String clientId, StreamConfig streamConfig,
KafkaSimpleConsumerFactory kafkaSimpleConsumerFactory) {
super(clientId, streamConfig, kafkaSimpleConsumerFactory);
+ _streamConfig = streamConfig;
}
/**
@@ -156,19 +161,30 @@ public class KafkaStreamMetadataProvider extends KafkaConnectionHandler implemen
}
/**
- * Fetch the partition group metadata list
+ * Fetch the partitionGroupMetadata list.
* @param currentPartitionGroupsMetadata In case of Kafka, each partition group contains a single partition.
- * Hence current partition groups are not needed to compute the new partition groups
*/
@Override
- public List<PartitionGroupMetadata> getPartitionGroupMetadataList(
- @Nullable List<PartitionGroupMetadata> currentPartitionGroupsMetadata, long timeoutMillis) {
+ public List<PartitionGroupInfo> getPartitionGroupInfoList(
+ List<PartitionGroupMetadata> currentPartitionGroupsMetadata, long timeoutMillis)
+ throws java.util.concurrent.TimeoutException {
int partitionCount = fetchPartitionCountInternal(timeoutMillis);
- List<PartitionGroupMetadata> partitionGroupMetadataList = new ArrayList<>(partitionCount);
- for (int i = 0; i < partitionCount; i++) {
- partitionGroupMetadataList.add(new KafkaPartitionGroupMetadata(i));
+ List<PartitionGroupInfo> newPartitionGroupInfoList = new ArrayList<>(partitionCount);
+
+ // add a PartitionGroupInfo into the list foreach partition already present in current.
+ // the end checkpoint is set as checkpoint
+ for (PartitionGroupMetadata currentPartitionGroupMetadata : currentPartitionGroupsMetadata) {
+ newPartitionGroupInfoList.add(new PartitionGroupInfo(currentPartitionGroupMetadata.getPartitionGroupId(),
+ currentPartitionGroupMetadata.getEndCheckpoint()));
+ }
+ // add PartitiongroupInfo for new partitions
+ // use offset criteria from stream config
+ for (int i = currentPartitionGroupsMetadata.size(); i < partitionCount; i++) {
+ StreamPartitionMsgOffset streamPartitionMsgOffset =
+ fetchStreamPartitionOffset(_streamConfig.getOffsetCriteria(), 5000);
+ newPartitionGroupInfoList.add(new PartitionGroupInfo(i, streamPartitionMsgOffset.toString()));
}
- return partitionGroupMetadataList;
+ return newPartitionGroupInfoList;
}
public synchronized long fetchPartitionOffset(@Nonnull OffsetCriteria offsetCriteria, long timeoutMillis)
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/test/java/org/apache/pinot/plugin/stream/kafka09/KafkaPartitionLevelConsumerTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/test/java/org/apache/pinot/plugin/stream/kafka09/KafkaPartitionLevelConsumerTest.java
index 43b72a8..9d3091e 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/test/java/org/apache/pinot/plugin/stream/kafka09/KafkaPartitionLevelConsumerTest.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/test/java/org/apache/pinot/plugin/stream/kafka09/KafkaPartitionLevelConsumerTest.java
@@ -267,7 +267,7 @@ public class KafkaPartitionLevelConsumerTest {
}
@Test
- public void testGetPartitionCount() {
+ public void testGetPartitionCount() throws Exception {
String streamType = "kafka";
String streamKafkaTopicName = "theTopic";
String streamKafkaBrokerList = "abcd:1234,bcde:2345";
@@ -291,7 +291,7 @@ public class KafkaPartitionLevelConsumerTest {
KafkaStreamMetadataProvider streamMetadataProvider =
new KafkaStreamMetadataProvider(clientId, streamConfig, mockKafkaSimpleConsumerFactory);
- Assert.assertEquals(streamMetadataProvider.getPartitionGroupMetadataList(Collections.emptyList(), 10000L), 2);
+ Assert.assertEquals(streamMetadataProvider.getPartitionGroupInfoList(Collections.emptyList(), 10000L), 2);
}
@Test
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
index eb606f2..ef22b6a 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
@@ -58,10 +58,9 @@ public class KafkaStreamMetadataProvider extends KafkaPartitionLevelConnectionHa
/**
* Fetch the partitionGroupMetadata list.
* @param currentPartitionGroupsMetadata In case of Kafka, each partition group contains a single partition.
- * Hence current partition groups are not needed to compute the new partition groups
*/
@Override
- public List<PartitionGroupInfo> getPartitionGroupMetadataList(
+ public List<PartitionGroupInfo> getPartitionGroupInfoList(
List<PartitionGroupMetadata> currentPartitionGroupsMetadata, long timeoutMillis)
throws TimeoutException {
int partitionCount = _consumer.partitionsFor(_topic, Duration.ofMillis(timeoutMillis)).size();
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupInfoFetcher.java
similarity index 64%
rename from pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
rename to pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupInfoFetcher.java
index e1ce1a6..d13be10 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupInfoFetcher.java
@@ -27,27 +27,24 @@ import org.slf4j.LoggerFactory;
/**
* Fetches the partition count of a stream using the {@link StreamMetadataProvider}
*/
-public class PartitionGroupMetadataFetcher implements Callable<Boolean> {
+public class PartitionGroupInfoFetcher implements Callable<Boolean> {
- private static final Logger LOGGER = LoggerFactory.getLogger(PartitionGroupMetadataFetcher.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(PartitionGroupInfoFetcher.class);
- private int _partitionCount = -1;
- private List<PartitionGroupMetadata> _partitionGroupMetadataList;
- private List<PartitionGroupMetadata> _currentPartitionGroupMetadata;
- private final StreamConfig _streamConfig;
- private StreamConsumerFactory _streamConsumerFactory;
+ private List<PartitionGroupInfo> _partitionGroupInfoList;
+ private final List<PartitionGroupMetadata> _currentPartitionGroupMetadata;
+ private final StreamConsumerFactory _streamConsumerFactory;
private Exception _exception;
private final String _topicName;
- public PartitionGroupMetadataFetcher(StreamConfig streamConfig, List<PartitionGroupMetadata> currentPartitionGroupMetadataList) {
- _streamConfig = streamConfig;
- _streamConsumerFactory = StreamConsumerFactoryProvider.create(_streamConfig);
+ public PartitionGroupInfoFetcher(StreamConfig streamConfig, List<PartitionGroupMetadata> currentPartitionGroupMetadataList) {
+ _streamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig);
_topicName = streamConfig.getTopicName();
_currentPartitionGroupMetadata = currentPartitionGroupMetadataList;
}
- public List<PartitionGroupMetadata> getPartitionGroupMetadataList() {
- return _partitionGroupMetadataList;
+ public List<PartitionGroupInfo> getPartitionGroupInfoList() {
+ return _partitionGroupInfoList;
}
public Exception getException() {
@@ -55,21 +52,19 @@ public class PartitionGroupMetadataFetcher implements Callable<Boolean> {
}
/**
- * Callable to fetch the number of partitions of the stream given the stream metadata
- * @return
- * @throws Exception
+ * Callable to fetch the partition group info for the stream
*/
@Override
public Boolean call()
throws Exception {
- String clientId = PartitionGroupMetadataFetcher.class.getSimpleName() + "-" + _topicName;
+ String clientId = PartitionGroupInfoFetcher.class.getSimpleName() + "-" + _topicName;
try (
StreamMetadataProvider streamMetadataProvider = _streamConsumerFactory.createStreamMetadataProvider(clientId)) {
- _partitionGroupMetadataList = streamMetadataProvider.getPartitionGroupMetadataList(_currentPartitionGroupMetadata, /*maxWaitTimeMs=*/5000L);
+ _partitionGroupInfoList = streamMetadataProvider.getPartitionGroupInfoList(_currentPartitionGroupMetadata, /*maxWaitTimeMs=*/5000L);
if (_exception != null) {
// We had at least one failure, but succeeded now. Log an info
- LOGGER.info("Successfully retrieved partition count as {} for topic {}", _partitionCount, _topicName);
+ LOGGER.info("Successfully retrieved partition group info for topic {}", _topicName);
}
return Boolean.TRUE;
} catch (TransientConsumerException e) {
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
index a9cd2d6..f595ea3 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
@@ -40,7 +40,7 @@ public interface StreamMetadataProvider extends Closeable {
int fetchPartitionCount(long timeoutMillis);
// takes the current state of partition groups (groupings of shards, the state of the consumption) and creates the new state
- List<PartitionGroupInfo> getPartitionGroupMetadataList(
+ List<PartitionGroupInfo> getPartitionGroupInfoList(
List<PartitionGroupMetadata> currentPartitionGroupsMetadata, long timeoutMillis)
throws TimeoutException;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org