You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2021/02/02 02:42:53 UTC
[incubator-pinot] 03/47: Rename partitionId to partitionGroupId
This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 3892fc417c2f7d07e15b78eae1e1b3dd09e60090
Author: Neha Pawar <ne...@gmail.com>
AuthorDate: Wed Dec 30 13:46:22 2020 -0800
Rename partitionId to partitionGroupId
---
.../segmentselector/RealtimeSegmentSelector.java | 2 +-
.../apache/pinot/common/utils/LLCSegmentName.java | 24 +++++-----
.../org/apache/pinot/common/utils/SegmentName.java | 2 +-
.../pinot/common/utils/SegmentNameBuilderTest.java | 6 +--
.../helix/core/PinotHelixResourceManager.java | 5 ++-
.../helix/core/PinotTableIdealStateBuilder.java | 15 ++++---
.../segment/RealtimeSegmentAssignment.java | 6 +--
.../RealtimeToOfflineSegmentsTaskGenerator.java | 4 +-
.../realtime/PinotLLCRealtimeSegmentManager.java | 51 ++++++++++++----------
.../SegmentSizeBasedFlushThresholdUpdater.java | 2 +-
.../PinotLLCRealtimeSegmentManagerTest.java | 16 ++++---
.../realtime/LLRealtimeSegmentDataManager.java | 34 ++++++++-------
.../manager/realtime/RealtimeTableDataManager.java | 13 +++---
.../realtime/LLRealtimeSegmentDataManagerTest.java | 10 ++---
.../fakestream/FakePartitionGroupMetadata.java | 48 ++++++++++++++++++++
.../impl/fakestream/FakeStreamConsumerFactory.java | 10 +----
.../fakestream/FakeStreamMetadataProvider.java | 15 ++++++-
...lakyConsumerRealtimeClusterIntegrationTest.java | 9 +---
...PartitionLLCRealtimeClusterIntegrationTest.java | 6 +--
.../stream/kafka09/KafkaConsumerFactory.java | 9 +---
.../kafka09/KafkaPartitionGroupMetadata.java | 48 ++++++++++++++++++++
.../kafka09/KafkaStreamMetadataProvider.java | 26 +++++++++++
.../kafka09/KafkaPartitionLevelConsumerTest.java | 2 +-
.../stream/kafka20/KafkaConsumerFactory.java | 9 +---
.../kafka20/KafkaPartitionGroupMetadata.java | 48 ++++++++++++++++++++
.../kafka20/KafkaStreamMetadataProvider.java | 21 +++++++++
...her.java => PartitionGroupMetadataFetcher.java} | 18 +++++---
.../pinot/spi/stream/PartitionOffsetFetcher.java | 15 ++++---
.../pinot/spi/stream/StreamConsumerFactory.java | 8 +---
.../pinot/spi/stream/StreamMetadataProvider.java | 9 +++-
30 files changed, 347 insertions(+), 144 deletions(-)
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/RealtimeSegmentSelector.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/RealtimeSegmentSelector.java
index f462326..2d778c6 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/RealtimeSegmentSelector.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/RealtimeSegmentSelector.java
@@ -95,7 +95,7 @@ public class RealtimeSegmentSelector implements SegmentSelector {
if (instanceStateMap.containsValue(SegmentStateModel.CONSUMING)) {
// Keep the first CONSUMING segment for each partition
LLCSegmentName llcSegmentName = new LLCSegmentName(segment);
- partitionIdToFirstConsumingLLCSegmentMap.compute(llcSegmentName.getPartitionId(), (k, consumingSegment) -> {
+ partitionIdToFirstConsumingLLCSegmentMap.compute(llcSegmentName.getPartitionGroupId(), (k, consumingSegment) -> {
if (consumingSegment == null) {
return llcSegmentName;
} else {
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java
index adc24ad..a66bb3c 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java
@@ -26,7 +26,7 @@ import org.joda.time.DateTimeZone;
public class LLCSegmentName extends SegmentName implements Comparable {
private final static String DATE_FORMAT = "yyyyMMdd'T'HHmm'Z'";
private final String _tableName;
- private final int _partitionId;
+ private final int _partitionGroupId;
private final int _sequenceNumber;
private final String _creationTime;
private final String _segmentName;
@@ -39,22 +39,22 @@ public class LLCSegmentName extends SegmentName implements Comparable {
_segmentName = segmentName;
String[] parts = StringUtils.splitByWholeSeparator(segmentName, SEPARATOR);
_tableName = parts[0];
- _partitionId = Integer.parseInt(parts[1]);
+ _partitionGroupId = Integer.parseInt(parts[1]);
_sequenceNumber = Integer.parseInt(parts[2]);
_creationTime = parts[3];
}
- public LLCSegmentName(String tableName, int partitionId, int sequenceNumber, long msSinceEpoch) {
+ public LLCSegmentName(String tableName, int partitionGroupId, int sequenceNumber, long msSinceEpoch) {
if (!isValidComponentName(tableName)) {
throw new RuntimeException("Invalid table name " + tableName);
}
_tableName = tableName;
- _partitionId = partitionId;
+ _partitionGroupId = partitionGroupId;
_sequenceNumber = sequenceNumber;
// ISO8601 date: 20160120T1234Z
DateTime dateTime = new DateTime(msSinceEpoch, DateTimeZone.UTC);
_creationTime = dateTime.toString(DATE_FORMAT);
- _segmentName = tableName + SEPARATOR + partitionId + SEPARATOR + sequenceNumber + SEPARATOR + _creationTime;
+ _segmentName = tableName + SEPARATOR + partitionGroupId + SEPARATOR + sequenceNumber + SEPARATOR + _creationTime;
}
/**
@@ -75,13 +75,13 @@ public class LLCSegmentName extends SegmentName implements Comparable {
}
@Override
- public int getPartitionId() {
- return _partitionId;
+ public int getPartitionGroupId() {
+ return _partitionGroupId;
}
@Override
public String getPartitionRange() {
- return Integer.toString(getPartitionId());
+ return Integer.toString(getPartitionGroupId());
}
@Override
@@ -110,9 +110,9 @@ public class LLCSegmentName extends SegmentName implements Comparable {
throw new RuntimeException(
"Cannot compare segment names " + this.getSegmentName() + " and " + other.getSegmentName());
}
- if (this.getPartitionId() > other.getPartitionId()) {
+ if (this.getPartitionGroupId() > other.getPartitionGroupId()) {
return 1;
- } else if (this.getPartitionId() < other.getPartitionId()) {
+ } else if (this.getPartitionGroupId() < other.getPartitionGroupId()) {
return -1;
} else {
if (this.getSequenceNumber() > other.getSequenceNumber()) {
@@ -141,7 +141,7 @@ public class LLCSegmentName extends SegmentName implements Comparable {
LLCSegmentName segName = (LLCSegmentName) o;
- if (_partitionId != segName._partitionId) {
+ if (_partitionGroupId != segName._partitionGroupId) {
return false;
}
if (_sequenceNumber != segName._sequenceNumber) {
@@ -159,7 +159,7 @@ public class LLCSegmentName extends SegmentName implements Comparable {
@Override
public int hashCode() {
int result = _tableName != null ? _tableName.hashCode() : 0;
- result = 31 * result + _partitionId;
+ result = 31 * result + _partitionGroupId;
result = 31 * result + _sequenceNumber;
result = 31 * result + (_creationTime != null ? _creationTime.hashCode() : 0);
result = 31 * result + (_segmentName != null ? _segmentName.hashCode() : 0);
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentName.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentName.java
index 6763f6d..b0c00ae 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentName.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentName.java
@@ -63,7 +63,7 @@ public abstract class SegmentName {
throw new RuntimeException("No groupId in " + getSegmentName());
}
- public int getPartitionId() {
+ public int getPartitionGroupId() {
throw new RuntimeException("No partitionId in " + getSegmentName());
}
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/SegmentNameBuilderTest.java b/pinot-common/src/test/java/org/apache/pinot/common/utils/SegmentNameBuilderTest.java
index f632f51..de606cc 100644
--- a/pinot-common/src/test/java/org/apache/pinot/common/utils/SegmentNameBuilderTest.java
+++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/SegmentNameBuilderTest.java
@@ -58,7 +58,7 @@ public class SegmentNameBuilderTest {
// Check partition range
assertEquals(longNameSegment.getPartitionRange(), "0");
assertEquals(shortNameSegment.getPartitionRange(), "ALL");
- assertEquals(llcSegment.getPartitionId(), 0);
+ assertEquals(llcSegment.getPartitionGroupId(), 0);
// Check groupId
assertEquals(longNameSegment.getGroupId(), "myTable_REALTIME_1234567_0");
@@ -127,14 +127,14 @@ public class SegmentNameBuilderTest {
LLCSegmentName segName1 = new LLCSegmentName(tableName, partitionId, sequenceNumber, msSinceEpoch);
Assert.assertEquals(segName1.getSegmentName(), segmentName);
- Assert.assertEquals(segName1.getPartitionId(), partitionId);
+ Assert.assertEquals(segName1.getPartitionGroupId(), partitionId);
Assert.assertEquals(segName1.getCreationTime(), creationTime);
Assert.assertEquals(segName1.getSequenceNumber(), sequenceNumber);
Assert.assertEquals(segName1.getTableName(), tableName);
LLCSegmentName segName2 = new LLCSegmentName(segmentName);
Assert.assertEquals(segName2.getSegmentName(), segmentName);
- Assert.assertEquals(segName2.getPartitionId(), partitionId);
+ Assert.assertEquals(segName2.getPartitionGroupId(), partitionId);
Assert.assertEquals(segName2.getCreationTime(), creationTime);
Assert.assertEquals(segName2.getSequenceNumber(), sequenceNumber);
Assert.assertEquals(segName2.getTableName(), tableName);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index fa117fa..a04e0bc 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -130,6 +130,7 @@ import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamConsumerFactory;
import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.spi.utils.retry.RetryPolicies;
@@ -1396,6 +1397,8 @@ public class PinotHelixResourceManager {
*/
private void setupShardedRealtimeTable(StreamConfig streamConfig, IdealState idealState, int numReplicas) {
StreamConsumerFactory streamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig);
+ StreamMetadataProvider streamMetadataProvider = streamConsumerFactory
+ .createStreamMetadataProvider(streamConfig.getTopicName() + "_" + System.currentTimeMillis());
// get current partition groups and their metadata - this will be empty when creating the table
List<PartitionGroupMetadata> currentPartitionGroupMetadataList = _pinotLLCRealtimeSegmentManager.getCurrentPartitionGroupMetadataList(idealState);
@@ -1403,7 +1406,7 @@ public class PinotHelixResourceManager {
// get new partition groups and their metadata,
// Assume table has 3 shards. Say we get [0], [1], [2] groups (for now assume that each group contains only 1 shard)
List<PartitionGroupMetadata> newPartitionGroupMetadataList =
- streamConsumerFactory.getPartitionGroupMetadataList(currentPartitionGroupMetadataList);
+ streamMetadataProvider.getPartitionGroupMetadataList(currentPartitionGroupMetadataList, 5000);
// setup segment zk metadata and ideal state for all the new found partition groups
_pinotLLCRealtimeSegmentManager.setupNewPartitionGroups(newPartitionGroupMetadataList, numReplicas);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
index a564542..1e95966 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
@@ -32,7 +32,8 @@ import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.stream.PartitionCountFetcher;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
+import org.apache.pinot.spi.stream.PartitionGroupMetadataFetcher;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.apache.pinot.spi.utils.retry.RetryPolicies;
@@ -115,13 +116,15 @@ public class PinotTableIdealStateBuilder {
pinotLLCRealtimeSegmentManager.setUpNewTable(realtimeTableConfig, idealState);
}
- public static int getPartitionCount(StreamConfig streamConfig) {
- PartitionCountFetcher partitionCountFetcher = new PartitionCountFetcher(streamConfig);
+ public static List<PartitionGroupMetadata> getPartitionGroupMetadataList(StreamConfig streamConfig,
+ List<PartitionGroupMetadata> currentPartitionGroupMetadataList) {
+ PartitionGroupMetadataFetcher partitionGroupMetadataFetcher =
+ new PartitionGroupMetadataFetcher(streamConfig, currentPartitionGroupMetadataList);
try {
- RetryPolicies.noDelayRetryPolicy(3).attempt(partitionCountFetcher);
- return partitionCountFetcher.getPartitionCount();
+ RetryPolicies.noDelayRetryPolicy(3).attempt(partitionGroupMetadataFetcher);
+ return partitionGroupMetadataFetcher.getPartitionGroupMetadataList();
} catch (Exception e) {
- Exception fetcherException = partitionCountFetcher.getException();
+ Exception fetcherException = partitionGroupMetadataFetcher.getException();
LOGGER.error("Could not get partition count for {}", streamConfig.getTopicName(), fetcherException);
throw new RuntimeException(fetcherException);
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
index a069734..e27958f 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
@@ -136,7 +136,7 @@ public class RealtimeSegmentAssignment implements SegmentAssignment {
* Helper method to assign instances for CONSUMING segment based on the segment partition id and instance partitions.
*/
private List<String> assignConsumingSegment(String segmentName, InstancePartitions instancePartitions) {
- int partitionId = new LLCSegmentName(segmentName).getPartitionId();
+ int partitionId = new LLCSegmentName(segmentName).getPartitionGroupId();
int numReplicaGroups = instancePartitions.getNumReplicaGroups();
if (numReplicaGroups == 1) {
@@ -325,7 +325,7 @@ public class RealtimeSegmentAssignment implements SegmentAssignment {
Map<Integer, List<String>> partitionIdToSegmentsMap = new HashMap<>();
for (String segmentName : currentAssignment.keySet()) {
- int partitionId = new LLCSegmentName(segmentName).getPartitionId();
+ int partitionId = new LLCSegmentName(segmentName).getPartitionGroupId();
partitionIdToSegmentsMap.computeIfAbsent(partitionId, k -> new ArrayList<>()).add(segmentName);
}
@@ -360,7 +360,7 @@ public class RealtimeSegmentAssignment implements SegmentAssignment {
// Replica-group based assignment
// Uniformly spray the segment partitions over the instance partitions
- int segmentPartitionId = new LLCSegmentName(segmentName).getPartitionId();
+ int segmentPartitionId = new LLCSegmentName(segmentName).getPartitionGroupId();
int numPartitions = instancePartitions.getNumPartitions();
int partitionId = segmentPartitionId % numPartitions;
return SegmentAssignmentUtils.assignSegmentWithReplicaGroup(currentAssignment, instancePartitions, partitionId);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java
index a278396..8208d8e 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java
@@ -252,11 +252,11 @@ public class RealtimeToOfflineSegmentsTaskGenerator implements PinotTaskGenerato
Map<Integer, LLCSegmentName> latestLLCSegmentNameMap = new HashMap<>();
for (LLCRealtimeSegmentZKMetadata metadata : realtimeSegmentsMetadataList) {
LLCSegmentName llcSegmentName = new LLCSegmentName(metadata.getSegmentName());
- allPartitions.add(llcSegmentName.getPartitionId());
+ allPartitions.add(llcSegmentName.getPartitionGroupId());
if (metadata.getStatus().equals(Segment.Realtime.Status.DONE)) {
completedSegmentsMetadataList.add(metadata);
- latestLLCSegmentNameMap.compute(llcSegmentName.getPartitionId(), (partitionId, latestLLCSegmentName) -> {
+ latestLLCSegmentNameMap.compute(llcSegmentName.getPartitionGroupId(), (partitionId, latestLLCSegmentName) -> {
if (latestLLCSegmentName == null) {
return llcSegmentName;
} else {
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 8a29489..189be8b 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -83,6 +83,7 @@ import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamConfigProperties;
import org.apache.pinot.spi.stream.StreamConsumerFactory;
import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
@@ -186,8 +187,10 @@ public class PinotLLCRealtimeSegmentManager {
// get new partition groups (honor any groupings which are already consuming - [0], [1], [2])
StreamConsumerFactory streamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig);
+ StreamMetadataProvider streamMetadataProvider = streamConsumerFactory
+ .createStreamMetadataProvider(streamConfig.getTopicName() + " " + System.currentTimeMillis());
List<PartitionGroupMetadata> newPartitionGroupMetadataList =
- streamConsumerFactory.getPartitionGroupMetadataList(currentPartitionGroupMetadataList);
+ streamMetadataProvider.getPartitionGroupMetadataList(currentPartitionGroupMetadataList, 1000);
// from the above list, remove the partition groups which are already CONSUMING
// i.e. newPartitionGroups - currentPartitionGroups. Therefore, ([0], [1], [2]) - ([1], [2]) = ([0])
@@ -292,7 +295,8 @@ public class PinotLLCRealtimeSegmentManager {
PartitionLevelStreamConfig streamConfig =
new PartitionLevelStreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig));
InstancePartitions instancePartitions = getConsumingInstancePartitions(tableConfig);
- int numPartitions = getNumPartitions(streamConfig);
+ List<PartitionGroupMetadata> currentPartitionGroupMetadataList = getCurrentPartitionGroupMetadataList(idealState);
+ int numPartitionGroups = getPartitionGroupMetadataList(streamConfig, currentPartitionGroupMetadataList).size();
int numReplicas = getNumReplicas(tableConfig, instancePartitions);
SegmentAssignment segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(_helixManager, tableConfig);
@@ -301,9 +305,9 @@ public class PinotLLCRealtimeSegmentManager {
long currentTimeMs = getCurrentTimeMs();
Map<String, Map<String, String>> instanceStatesMap = idealState.getRecord().getMapFields();
- for (int partitionId = 0; partitionId < numPartitions; partitionId++) {
+ for (int partitionGroupId = 0; partitionGroupId < numPartitionGroups; partitionGroupId++) {
String segmentName =
- setupNewPartition(tableConfig, streamConfig, partitionId, currentTimeMs, instancePartitions, numPartitions,
+ setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupId, currentTimeMs, instancePartitions, numPartitionGroups,
numReplicas);
updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, segmentName, segmentAssignment,
instancePartitionsMap);
@@ -635,7 +639,7 @@ public class PinotLLCRealtimeSegmentManager {
// Add the partition metadata if available
SegmentPartitionMetadata partitionMetadata =
- getPartitionMetadataFromTableConfig(tableConfig, newLLCSegmentName.getPartitionId());
+ getPartitionMetadataFromTableConfig(tableConfig, newLLCSegmentName.getPartitionGroupId());
if (partitionMetadata != null) {
newSegmentZKMetadata.setPartitionMetadata(partitionMetadata);
}
@@ -705,22 +709,23 @@ public class PinotLLCRealtimeSegmentManager {
}
@VisibleForTesting
- int getNumPartitions(StreamConfig streamConfig) {
- return PinotTableIdealStateBuilder.getPartitionCount(streamConfig);
+ List<PartitionGroupMetadata> getPartitionGroupMetadataList(StreamConfig streamConfig,
+ List<PartitionGroupMetadata> currentPartitionGroupMetadataList) {
+ return PinotTableIdealStateBuilder.getPartitionGroupMetadataList(streamConfig, currentPartitionGroupMetadataList);
}
@VisibleForTesting
StreamPartitionMsgOffset getPartitionOffset(StreamConfig streamConfig, OffsetCriteria offsetCriteria,
- int partitionId) {
+ int partitionGroupId) {
PartitionOffsetFetcher partitionOffsetFetcher =
- new PartitionOffsetFetcher(offsetCriteria, partitionId, streamConfig);
+ new PartitionOffsetFetcher(offsetCriteria, partitionGroupId, streamConfig);
try {
RetryPolicies.fixedDelayRetryPolicy(3, 1000L).attempt(partitionOffsetFetcher);
return partitionOffsetFetcher.getOffset();
} catch (Exception e) {
throw new IllegalStateException(String
.format("Failed to fetch the offset for topic: %s, partition: %s with criteria: %s",
- streamConfig.getTopicName(), partitionId, offsetCriteria), e);
+ streamConfig.getTopicName(), partitionGroupId, offsetCriteria), e);
}
}
@@ -768,7 +773,7 @@ public class PinotLLCRealtimeSegmentManager {
Map<Integer, LLCSegmentName> latestLLCSegmentNameMap = new HashMap<>();
for (String segmentName : segments) {
LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
- latestLLCSegmentNameMap.compute(llcSegmentName.getPartitionId(), (partitionId, latestLLCSegmentName) -> {
+ latestLLCSegmentNameMap.compute(llcSegmentName.getPartitionGroupId(), (partitionId, latestLLCSegmentName) -> {
if (latestLLCSegmentName == null) {
return llcSegmentName;
} else {
@@ -821,10 +826,12 @@ public class PinotLLCRealtimeSegmentManager {
Preconditions.checkState(!_isStopping, "Segment manager is stopping");
String realtimeTableName = tableConfig.getTableName();
- int numPartitions = getNumPartitions(streamConfig);
HelixHelper.updateIdealState(_helixManager, realtimeTableName, idealState -> {
assert idealState != null;
if (idealState.isEnabled()) {
+ List<PartitionGroupMetadata> currentPartitionGroupMetadataList =
+ getCurrentPartitionGroupMetadataList(idealState);
+ int numPartitions = getPartitionGroupMetadataList(streamConfig, currentPartitionGroupMetadataList).size();
return ensureAllPartitionsConsuming(tableConfig, streamConfig, idealState, numPartitions);
} else {
LOGGER.info("Skipping LLC segments validation for disabled table: {}", realtimeTableName);
@@ -1085,7 +1092,7 @@ public class PinotLLCRealtimeSegmentManager {
String previousConsumingSegment = null;
for (Map.Entry<String, Map<String, String>> segmentEntry : instanceStatesMap.entrySet()) {
LLCSegmentName llcSegmentName = new LLCSegmentName(segmentEntry.getKey());
- if (llcSegmentName.getPartitionId() == partitionId && segmentEntry.getValue()
+ if (llcSegmentName.getPartitionGroupId() == partitionId && segmentEntry.getValue()
.containsValue(SegmentStateModel.CONSUMING)) {
previousConsumingSegment = llcSegmentName.getSegmentName();
break;
@@ -1110,7 +1117,7 @@ public class PinotLLCRealtimeSegmentManager {
for (int partitionId = 0; partitionId < numPartitions; partitionId++) {
if (!latestSegmentZKMetadataMap.containsKey(partitionId)) {
String newSegmentName =
- setupNewPartition(tableConfig, streamConfig, partitionId, currentTimeMs, instancePartitions, numPartitions,
+ setupNewPartitionGroup(tableConfig, streamConfig, partitionId, currentTimeMs, instancePartitions, numPartitions,
numReplicas);
updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, newSegmentName, segmentAssignment,
instancePartitionsMap);
@@ -1121,7 +1128,7 @@ public class PinotLLCRealtimeSegmentManager {
}
private LLCSegmentName getNextLLCSegmentName(LLCSegmentName lastLLCSegmentName, long creationTimeMs) {
- return new LLCSegmentName(lastLLCSegmentName.getTableName(), lastLLCSegmentName.getPartitionId(),
+ return new LLCSegmentName(lastLLCSegmentName.getTableName(), lastLLCSegmentName.getPartitionGroupId(),
lastLLCSegmentName.getSequenceNumber() + 1, creationTimeMs);
}
@@ -1129,21 +1136,21 @@ public class PinotLLCRealtimeSegmentManager {
* Sets up a new partition.
* <p>Persists the ZK metadata for the first CONSUMING segment, and returns the segment name.
*/
- private String setupNewPartition(TableConfig tableConfig, PartitionLevelStreamConfig streamConfig, int partitionId,
- long creationTimeMs, InstancePartitions instancePartitions, int numPartitions, int numReplicas) {
+ private String setupNewPartitionGroup(TableConfig tableConfig, PartitionLevelStreamConfig streamConfig, int partitionGroupId,
+ long creationTimeMs, InstancePartitions instancePartitions, int numPartitionGroups, int numReplicas) {
String realtimeTableName = tableConfig.getTableName();
- LOGGER.info("Setting up new partition: {} for table: {}", partitionId, realtimeTableName);
+ LOGGER.info("Setting up new partition group: {} for table: {}", partitionGroupId, realtimeTableName);
String rawTableName = TableNameBuilder.extractRawTableName(realtimeTableName);
LLCSegmentName newLLCSegmentName =
- new LLCSegmentName(rawTableName, partitionId, STARTING_SEQUENCE_NUMBER, creationTimeMs);
+ new LLCSegmentName(rawTableName, partitionGroupId, STARTING_SEQUENCE_NUMBER, creationTimeMs);
String newSegmentName = newLLCSegmentName.getSegmentName();
StreamPartitionMsgOffset startOffset =
- getPartitionOffset(streamConfig, streamConfig.getOffsetCriteria(), partitionId);
+ getPartitionOffset(streamConfig, streamConfig.getOffsetCriteria(), partitionGroupId);
CommittingSegmentDescriptor committingSegmentDescriptor =
new CommittingSegmentDescriptor(null, startOffset.toString(), 0);
createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, creationTimeMs,
- committingSegmentDescriptor, null, instancePartitions, numPartitions, numReplicas);
+ committingSegmentDescriptor, null, instancePartitions, numPartitionGroups, numReplicas);
return newSegmentName;
}
@@ -1157,7 +1164,7 @@ public class PinotLLCRealtimeSegmentManager {
int numPartitions = 0;
for (String segmentName : idealState.getRecord().getMapFields().keySet()) {
if (LLCSegmentName.isLowLevelConsumerSegmentName(segmentName)) {
- numPartitions = Math.max(numPartitions, new LLCSegmentName(segmentName).getPartitionId() + 1);
+ numPartitions = Math.max(numPartitions, new LLCSegmentName(segmentName).getPartitionGroupId() + 1);
}
}
return numPartitions;
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java
index 2e73806..56ae29e 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java
@@ -102,7 +102,7 @@ public class SegmentSizeBasedFlushThresholdUpdater implements FlushThresholdUpda
// less same characteristics at any one point in time).
// However, when we start a new table or change controller mastership, we can have any partition completing first.
// It is best to learn the ratio as quickly as we can, so we allow any partition to supply the value.
- if (new LLCSegmentName(newSegmentName).getPartitionId() == 0 || _latestSegmentRowsToSizeRatio == 0) {
+ if (new LLCSegmentName(newSegmentName).getPartitionGroupId() == 0 || _latestSegmentRowsToSizeRatio == 0) {
if (_latestSegmentRowsToSizeRatio > 0) {
_latestSegmentRowsToSizeRatio =
CURRENT_SEGMENT_RATIO_WEIGHT * currentRatio + PREVIOUS_SEGMENT_RATIO_WEIGHT * _latestSegmentRowsToSizeRatio;
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index 4888f17..743e719 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -30,6 +30,8 @@ import java.util.Map;
import java.util.Random;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
import org.apache.helix.model.IdealState;
@@ -48,6 +50,7 @@ import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignme
import org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
import org.apache.pinot.controller.util.SegmentCompletionUtils;
import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
+import org.apache.pinot.core.realtime.impl.fakestream.FakePartitionGroupMetadata;
import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.spi.config.table.TableConfig;
@@ -57,6 +60,7 @@ import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
import org.apache.pinot.spi.stream.LongMsgOffset;
import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
@@ -333,7 +337,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
assertTrue(oldSegmentZKMetadataMap.containsKey(segmentName));
assertTrue(segmentZKMetadataMap.containsKey(segmentName));
assertEquals(segmentZKMetadataMap.get(segmentName), oldSegmentZKMetadataMap.get(segmentName));
- oldNumPartitions = Math.max(oldNumPartitions, new LLCSegmentName(segmentName).getPartitionId() + 1);
+ oldNumPartitions = Math.max(oldNumPartitions, new LLCSegmentName(segmentName).getPartitionGroupId() + 1);
}
// Check that for new partitions, each partition should have exactly 1 new segment in CONSUMING state, and metadata
@@ -341,7 +345,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
Map<Integer, List<String>> partitionIdToSegmentsMap = new HashMap<>();
for (Map.Entry<String, Map<String, String>> entry : instanceStatesMap.entrySet()) {
String segmentName = entry.getKey();
- int partitionId = new LLCSegmentName(segmentName).getPartitionId();
+ int partitionId = new LLCSegmentName(segmentName).getPartitionGroupId();
partitionIdToSegmentsMap.computeIfAbsent(partitionId, k -> new ArrayList<>()).add(segmentName);
}
for (int partitionId = oldNumPartitions; partitionId < segmentManager._numPartitions; partitionId++) {
@@ -579,7 +583,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
if (instanceStateMap.containsValue(SegmentStateModel.ONLINE) || instanceStateMap.containsValue(
SegmentStateModel.CONSUMING)) {
LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
- int partitionsId = llcSegmentName.getPartitionId();
+ int partitionsId = llcSegmentName.getPartitionGroupId();
Map<Integer, String> sequenceNumberToSegmentMap = partitionIdToSegmentsMap.get(partitionsId);
int sequenceNumber = llcSegmentName.getSequenceNumber();
assertFalse(sequenceNumberToSegmentMap.containsKey(sequenceNumber));
@@ -910,12 +914,12 @@ public class PinotLLCRealtimeSegmentManagerTest {
}
@Override
- int getNumPartitions(StreamConfig streamConfig) {
- return _numPartitions;
+ List<PartitionGroupMetadata> getPartitionGroupMetadataList(StreamConfig streamConfig, List<PartitionGroupMetadata> currentPartitionGroupMetadataList) {
+ return IntStream.range(0, _numPartitions).mapToObj(FakePartitionGroupMetadata::new).collect(Collectors.toList());
}
@Override
- LongMsgOffset getPartitionOffset(StreamConfig streamConfig, OffsetCriteria offsetCriteria, int partitionId) {
+ LongMsgOffset getPartitionOffset(StreamConfig streamConfig, OffsetCriteria offsetCriteria, int partitionGroupId) {
// The criteria for this test should always be SMALLEST (for default streaming config and new added partitions)
assertTrue(offsetCriteria.isSmallest());
return PARTITION_OFFSET;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 84d4592..13a9ab2 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -27,6 +27,7 @@ import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -214,7 +215,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
// Semaphore for each partitionId only, which is to prevent two different Kafka consumers
// from consuming with the same partitionId in parallel in the same host.
// See the comments in {@link RealtimeTableDataManager}.
- private final Semaphore _partitionConsumerSemaphore;
+ private final Semaphore _partitionGroupConsumerSemaphore;
// A boolean flag to check whether the current thread has acquired the semaphore.
// This boolean is needed because the semaphore is shared by threads; every thread holding this semaphore can
// modify the permit. This boolean make sure the semaphore gets released only once when the partition stops consuming.
@@ -247,7 +248,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
private Thread _consumerThread;
private final String _streamTopic;
- private final int _streamPartitionId;
+ private final int _partitionGroupId;
final String _clientId;
private final LLCSegmentName _llcSegmentName;
private final RecordTransformer _recordTransformer;
@@ -705,7 +706,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
@Override
public Map<String, String> getPartitionToCurrentOffset() {
Map<String, String> partitionToCurrentOffset = new HashMap<>();
- partitionToCurrentOffset.put(String.valueOf(_streamPartitionId), _currentOffset.toString());
+ partitionToCurrentOffset.put(String.valueOf(_partitionGroupId), _currentOffset.toString());
return partitionToCurrentOffset;
}
@@ -730,8 +731,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
}
@VisibleForTesting
- protected Semaphore getPartitionConsumerSemaphore() {
- return _partitionConsumerSemaphore;
+ protected Semaphore getPartitionGroupConsumerSemaphore() {
+ return _partitionGroupConsumerSemaphore;
}
@VisibleForTesting
@@ -892,7 +893,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
closePartitionLevelConsumer();
closeStreamMetadataProvider();
if (_acquiredConsumerSemaphore.compareAndSet(true, false)) {
- _partitionConsumerSemaphore.release();
+ _partitionGroupConsumerSemaphore.release();
}
}
@@ -1102,7 +1103,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
// If the transition is OFFLINE to ONLINE, the caller should have downloaded the segment and we don't reach here.
public LLRealtimeSegmentDataManager(RealtimeSegmentZKMetadata segmentZKMetadata, TableConfig tableConfig,
RealtimeTableDataManager realtimeTableDataManager, String resourceDataDir, IndexLoadingConfig indexLoadingConfig,
- Schema schema, LLCSegmentName llcSegmentName, Semaphore partitionConsumerSemaphore, ServerMetrics serverMetrics,
+ Schema schema, LLCSegmentName llcSegmentName, Semaphore partitionGroupConsumerSemaphore, ServerMetrics serverMetrics,
@Nullable PartitionUpsertMetadataManager partitionUpsertMetadataManager) {
_segBuildSemaphore = realtimeTableDataManager.getSegmentBuildSemaphore();
_segmentZKMetadata = (LLCRealtimeSegmentZKMetadata) segmentZKMetadata;
@@ -1129,10 +1130,10 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
_streamTopic = _partitionLevelStreamConfig.getTopicName();
_segmentNameStr = _segmentZKMetadata.getSegmentName();
_llcSegmentName = llcSegmentName;
- _streamPartitionId = _llcSegmentName.getPartitionId();
- _partitionConsumerSemaphore = partitionConsumerSemaphore;
+ _partitionGroupId = _llcSegmentName.getPartitionGroupId();
+ _partitionGroupConsumerSemaphore = partitionGroupConsumerSemaphore;
_acquiredConsumerSemaphore = new AtomicBoolean(false);
- _metricKeyName = _tableNameWithType + "-" + _streamTopic + "-" + _streamPartitionId;
+ _metricKeyName = _tableNameWithType + "-" + _streamTopic + "-" + _partitionGroupId;
segmentLogger = LoggerFactory.getLogger(LLRealtimeSegmentDataManager.class.getName() + "_" + _segmentNameStr);
_tableStreamName = _tableNameWithType + "_" + _streamTopic;
_memoryManager = getMemoryManager(realtimeTableDataManager.getConsumerDir(), _segmentNameStr,
@@ -1210,14 +1211,14 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
// Create message decoder
Set<String> fieldsToRead = IngestionUtils.getFieldsForRecordExtractor(_tableConfig.getIngestionConfig(), _schema);
_messageDecoder = StreamDecoderProvider.create(_partitionLevelStreamConfig, fieldsToRead);
- _clientId = _streamTopic + "-" + _streamPartitionId;
+ _clientId = _streamTopic + "-" + _partitionGroupId;
// Create record transformer
_recordTransformer = CompositeTransformer.getDefaultTransformer(tableConfig, schema);
// Acquire semaphore to create Kafka consumers
try {
- _partitionConsumerSemaphore.acquire();
+ _partitionGroupConsumerSemaphore.acquire();
_acquiredConsumerSemaphore.set(true);
} catch (InterruptedException e) {
String errorMsg = "InterruptedException when acquiring the partitionConsumerSemaphore";
@@ -1243,7 +1244,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
// long as the partition function is not changed.
int numPartitions = columnPartitionConfig.getNumPartitions();
try {
- int numStreamPartitions = _streamMetadataProvider.fetchPartitionCount(/*maxWaitTimeMs=*/5000L);
+ // fixme: get this from ideal state
+ int numStreamPartitions = _streamMetadataProvider.getPartitionGroupMetadataList(Collections.emptyList(), 5000).size();
if (numStreamPartitions != numPartitions) {
segmentLogger.warn(
"Number of stream partitions: {} does not match number of partitions in the partition config: {}, using number of stream partitions",
@@ -1261,7 +1263,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
realtimeSegmentConfigBuilder.setPartitionColumn(partitionColumn);
realtimeSegmentConfigBuilder
.setPartitionFunction(PartitionFunctionFactory.getPartitionFunction(partitionFunctionName, numPartitions));
- realtimeSegmentConfigBuilder.setPartitionId(_streamPartitionId);
+ realtimeSegmentConfigBuilder.setPartitionId(_partitionGroupId);
} else {
segmentLogger.warn("Cannot partition on multiple columns: {}", columnPartitionMap.keySet());
}
@@ -1313,7 +1315,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
closePartitionLevelConsumer();
}
segmentLogger.info("Creating new stream consumer, reason: {}", reason);
- _partitionLevelConsumer = _streamConsumerFactory.createPartitionLevelConsumer(_clientId, _streamPartitionId);
+ _partitionLevelConsumer = _streamConsumerFactory.createPartitionLevelConsumer(_clientId, _partitionGroupId);
}
/**
@@ -1325,7 +1327,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
closeStreamMetadataProvider();
}
segmentLogger.info("Creating new stream metadata provider, reason: {}", reason);
- _streamMetadataProvider = _streamConsumerFactory.createPartitionMetadataProvider(_clientId, _streamPartitionId);
+ _streamMetadataProvider = _streamConsumerFactory.createPartitionMetadataProvider(_clientId, _partitionGroupId);
}
// This should be done during commit? We may not always commit when we build a segment....
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 33283b9..9850048 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -36,6 +36,7 @@ import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.FileUtils;
+import org.apache.helix.model.IdealState;
import org.apache.pinot.common.Utils;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
@@ -89,7 +90,7 @@ public class RealtimeTableDataManager extends BaseTableDataManager {
// In some streams, it's possible that having multiple consumers (with the same consumer name on the same host) consuming from the same stream partition can lead to bugs.
// The semaphores will stay in the hash map even if the consuming partitions move to a different host.
// We expect that there will be a small number of semaphores, but that may be ok.
- private final Map<Integer, Semaphore> _partitionIdToSemaphoreMap = new ConcurrentHashMap<>();
+ private final Map<Integer, Semaphore> _partitionGroupIdToSemaphoreMap = new ConcurrentHashMap<>();
// The old name of the stats file used to be stats.ser which we changed when we moved all packages
// from com.linkedin to org.apache because of not being able to deserialize the old files using the newer classes
@@ -274,7 +275,7 @@ public class RealtimeTableDataManager extends BaseTableDataManager {
llcSegmentName = new LLCSegmentName(segmentName);
if (_tableUpsertMetadataManager != null) {
partitionUpsertMetadataManager =
- _tableUpsertMetadataManager.getOrCreatePartitionManager(llcSegmentName.getPartitionId());
+ _tableUpsertMetadataManager.getOrCreatePartitionManager(llcSegmentName.getPartitionGroupId());
}
}
@@ -307,11 +308,11 @@ public class RealtimeTableDataManager extends BaseTableDataManager {
}
// Generates only one semaphore for every partitionId
- int partitionId = llcSegmentName.getPartitionId();
- _partitionIdToSemaphoreMap.putIfAbsent(partitionId, new Semaphore(1));
+ int partitionGroupId = llcSegmentName.getPartitionGroupId();
+ _partitionGroupIdToSemaphoreMap.putIfAbsent(partitionGroupId, new Semaphore(1));
manager =
new LLRealtimeSegmentDataManager(realtimeSegmentZKMetadata, tableConfig, this, _indexDir.getAbsolutePath(),
- indexLoadingConfig, schema, llcSegmentName, _partitionIdToSemaphoreMap.get(partitionId), _serverMetrics,
+ indexLoadingConfig, schema, llcSegmentName, _partitionGroupIdToSemaphoreMap.get(partitionGroupId), _serverMetrics,
partitionUpsertMetadataManager);
}
_logger.info("Initialize RealtimeSegmentDataManager - " + segmentName);
@@ -336,7 +337,7 @@ public class RealtimeTableDataManager extends BaseTableDataManager {
columnToReaderMap.put(_timeColumnName, new PinotSegmentColumnReader(immutableSegment, _timeColumnName));
int numTotalDocs = immutableSegment.getSegmentMetadata().getTotalDocs();
String segmentName = immutableSegment.getSegmentName();
- int partitionId = new LLCSegmentName(immutableSegment.getSegmentName()).getPartitionId();
+ int partitionId = new LLCSegmentName(immutableSegment.getSegmentName()).getPartitionGroupId();
PartitionUpsertMetadataManager partitionUpsertMetadataManager =
_tableUpsertMetadataManager.getOrCreatePartitionManager(partitionId);
int numPrimaryKeyColumns = _primaryKeyColumns.size();
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
index 0017c43..d09bdeb 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
@@ -719,7 +719,7 @@ public class LLRealtimeSegmentDataManagerTest {
long timeout = 10_000L;
FakeLLRealtimeSegmentDataManager firstSegmentDataManager = createFakeSegmentManager();
Assert.assertTrue(firstSegmentDataManager.getAcquiredConsumerSemaphore().get());
- Semaphore firstSemaphore = firstSegmentDataManager.getPartitionConsumerSemaphore();
+ Semaphore firstSemaphore = firstSegmentDataManager.getPartitionGroupConsumerSemaphore();
Assert.assertEquals(firstSemaphore.availablePermits(), 0);
Assert.assertFalse(firstSemaphore.hasQueuedThreads());
@@ -751,18 +751,18 @@ public class LLRealtimeSegmentDataManagerTest {
"Failed to acquire the semaphore for the second segment manager in " + timeout + "ms");
Assert.assertTrue(secondSegmentDataManager.get().getAcquiredConsumerSemaphore().get());
- Semaphore secondSemaphore = secondSegmentDataManager.get().getPartitionConsumerSemaphore();
+ Semaphore secondSemaphore = secondSegmentDataManager.get().getPartitionGroupConsumerSemaphore();
Assert.assertEquals(firstSemaphore, secondSemaphore);
Assert.assertEquals(secondSemaphore.availablePermits(), 0);
Assert.assertFalse(secondSemaphore.hasQueuedThreads());
// Call destroy method the 2nd time on the first segment manager, the permits in semaphore won't increase.
firstSegmentDataManager.destroy();
- Assert.assertEquals(firstSegmentDataManager.getPartitionConsumerSemaphore().availablePermits(), 0);
+ Assert.assertEquals(firstSegmentDataManager.getPartitionGroupConsumerSemaphore().availablePermits(), 0);
// The permit finally gets released in the Semaphore.
secondSegmentDataManager.get().destroy();
- Assert.assertEquals(secondSegmentDataManager.get().getPartitionConsumerSemaphore().availablePermits(), 1);
+ Assert.assertEquals(secondSegmentDataManager.get().getPartitionGroupConsumerSemaphore().availablePermits(), 1);
}
public static class FakeLLRealtimeSegmentDataManager extends LLRealtimeSegmentDataManager {
@@ -800,7 +800,7 @@ public class LLRealtimeSegmentDataManagerTest {
throws Exception {
super(segmentZKMetadata, tableConfig, realtimeTableDataManager, resourceDataDir,
new IndexLoadingConfig(makeInstanceDataManagerConfig(), tableConfig), schema, llcSegmentName,
- semaphoreMap.get(llcSegmentName.getPartitionId()), serverMetrics,
+ semaphoreMap.get(llcSegmentName.getPartitionGroupId()), serverMetrics,
new PartitionUpsertMetadataManager("testTable_REALTIME", 0, serverMetrics));
_state = LLRealtimeSegmentDataManager.class.getDeclaredField("_state");
_state.setAccessible(true);
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakePartitionGroupMetadata.java b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakePartitionGroupMetadata.java
new file mode 100644
index 0000000..78ee12c
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakePartitionGroupMetadata.java
@@ -0,0 +1,48 @@
+package org.apache.pinot.core.realtime.impl.fakestream;
+
+import org.apache.pinot.spi.stream.Checkpoint;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
+
+
+public class FakePartitionGroupMetadata implements PartitionGroupMetadata {
+
+ private final int _groupId;
+ public FakePartitionGroupMetadata(int groupId) {
+ _groupId = groupId;
+ }
+
+ @Override
+ public int getGroupId() {
+ return getGroupId();
+ }
+
+ @Override
+ public Checkpoint getStartCheckpoint() {
+ return null;
+ }
+
+ @Override
+ public Checkpoint getEndCheckpoint() {
+ return null;
+ }
+
+ @Override
+ public void setStartCheckpoint(Checkpoint startCheckpoint) {
+
+ }
+
+ @Override
+ public void setEndCheckpoint(Checkpoint endCheckpoint) {
+
+ }
+
+ @Override
+ public byte[] serialize() {
+ return new byte[0];
+ }
+
+ @Override
+ public PartitionGroupMetadata deserialize(byte[] blob) {
+ return null;
+ }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
index 9669223..289b226 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
@@ -18,7 +18,6 @@
*/
package org.apache.pinot.core.realtime.impl.fakestream;
-import java.util.List;
import java.util.Set;
import org.apache.pinot.core.util.IngestionUtils;
import org.apache.pinot.spi.config.table.TableConfig;
@@ -69,14 +68,9 @@ public class FakeStreamConsumerFactory extends StreamConsumerFactory {
return new FakeStreamMetadataProvider(_streamConfig);
}
- @Override
- public List<PartitionGroupMetadata> getPartitionGroupMetadataList(
- List<PartitionGroupMetadata> currentPartitionGroupsMetadata) {
- return null;
- }
@Override
- public PartitionGroupConsumer createConsumer(PartitionGroupMetadata metadata) {
+ public PartitionGroupConsumer createPartitionGroupConsumer(PartitionGroupMetadata metadata) {
return null;
}
@@ -93,7 +87,7 @@ public class FakeStreamConsumerFactory extends StreamConsumerFactory {
// stream metadata provider
StreamMetadataProvider streamMetadataProvider = streamConsumerFactory.createStreamMetadataProvider(clientId);
- int partitionCount = streamMetadataProvider.fetchPartitionCount(10_000);
+ int partitionCount = streamMetadataProvider.getPartitionGroupMetadataList(null, 10_000).size();
System.out.println(partitionCount);
// Partition metadata provider
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMetadataProvider.java b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMetadataProvider.java
index e0b8ebd..c96d06a 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMetadataProvider.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMetadataProvider.java
@@ -19,9 +19,12 @@
package org.apache.pinot.core.realtime.impl.fakestream;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nonnull;
import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamMetadataProvider;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
@@ -31,7 +34,7 @@ import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
* StreamMetadataProvider implementation for the fake stream
*/
public class FakeStreamMetadataProvider implements StreamMetadataProvider {
- private int _numPartitions;
+ private final int _numPartitions;
public FakeStreamMetadataProvider(StreamConfig streamConfig) {
_numPartitions = FakeStreamConfigUtils.getNumPartitions(streamConfig);
@@ -42,6 +45,16 @@ public class FakeStreamMetadataProvider implements StreamMetadataProvider {
return _numPartitions;
}
+ @Override
+ public List<PartitionGroupMetadata> getPartitionGroupMetadataList(
+ List<PartitionGroupMetadata> currentPartitionGroupsMetadata, long timeoutMillis) {
+ List<PartitionGroupMetadata> partitionGroupMetadataList = new ArrayList<>();
+ for (int i = 0; i < _numPartitions; i++) {
+ partitionGroupMetadataList.add(new FakePartitionGroupMetadata(i));
+ }
+ return partitionGroupMetadataList;
+ }
+
public long fetchPartitionOffset(@Nonnull OffsetCriteria offsetCriteria, long timeoutMillis) throws TimeoutException {
throw new UnsupportedOperationException("This method is deprecated");
}
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java
index 808a464..d917d73 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java
@@ -19,7 +19,6 @@
package org.apache.pinot.integration.tests;
import java.lang.reflect.Constructor;
-import java.util.List;
import java.util.Random;
import java.util.Set;
import org.apache.pinot.spi.data.readers.GenericRow;
@@ -122,13 +121,7 @@ public class FlakyConsumerRealtimeClusterIntegrationTest extends RealtimeCluster
}
@Override
- public List<PartitionGroupMetadata> getPartitionGroupMetadataList(
- List<PartitionGroupMetadata> currentPartitionGroupsMetadata) {
- return null;
- }
-
- @Override
- public PartitionGroupConsumer createConsumer(PartitionGroupMetadata metadata) {
+ public PartitionGroupConsumer createPartitionGroupConsumer(PartitionGroupMetadata metadata) {
return null;
}
}
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentPartitionLLCRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentPartitionLLCRealtimeClusterIntegrationTest.java
index cd4f9b3..0196bde 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentPartitionLLCRealtimeClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentPartitionLLCRealtimeClusterIntegrationTest.java
@@ -165,7 +165,7 @@ public class SegmentPartitionLLCRealtimeClusterIntegrationTest extends BaseClust
assertNotNull(columnPartitionMetadata);
assertTrue(columnPartitionMetadata.getFunctionName().equalsIgnoreCase("murmur"));
assertEquals(columnPartitionMetadata.getNumPartitions(), 2);
- int streamPartitionId = new LLCSegmentName(segmentZKMetadata.getSegmentName()).getPartitionId();
+ int streamPartitionId = new LLCSegmentName(segmentZKMetadata.getSegmentName()).getPartitionGroupId();
assertEquals(columnPartitionMetadata.getPartitions(), Collections.singleton(streamPartitionId));
numSegmentsForPartition[streamPartitionId]++;
}
@@ -236,7 +236,7 @@ public class SegmentPartitionLLCRealtimeClusterIntegrationTest extends BaseClust
assertNotNull(columnPartitionMetadata);
assertTrue(columnPartitionMetadata.getFunctionName().equalsIgnoreCase("murmur"));
assertEquals(columnPartitionMetadata.getNumPartitions(), 2);
- int streamPartitionId = new LLCSegmentName(segmentZKMetadata.getSegmentName()).getPartitionId();
+ int streamPartitionId = new LLCSegmentName(segmentZKMetadata.getSegmentName()).getPartitionGroupId();
numSegmentsForPartition[streamPartitionId]++;
if (segmentZKMetadata.getStatus() == Status.IN_PROGRESS) {
@@ -313,7 +313,7 @@ public class SegmentPartitionLLCRealtimeClusterIntegrationTest extends BaseClust
assertNotNull(columnPartitionMetadata);
assertTrue(columnPartitionMetadata.getFunctionName().equalsIgnoreCase("murmur"));
assertEquals(columnPartitionMetadata.getNumPartitions(), 2);
- int streamPartitionId = new LLCSegmentName(segmentZKMetadata.getSegmentName()).getPartitionId();
+ int streamPartitionId = new LLCSegmentName(segmentZKMetadata.getSegmentName()).getPartitionGroupId();
numSegmentsForPartition[streamPartitionId]++;
if (segmentZKMetadata.getStatus() == Status.IN_PROGRESS) {
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaConsumerFactory.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaConsumerFactory.java
index b8ed19d..82c282c 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaConsumerFactory.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaConsumerFactory.java
@@ -18,7 +18,6 @@
*/
package org.apache.pinot.plugin.stream.kafka09;
-import java.util.List;
import java.util.Set;
import org.apache.pinot.spi.stream.PartitionGroupConsumer;
import org.apache.pinot.spi.stream.PartitionGroupMetadata;
@@ -55,13 +54,7 @@ public class KafkaConsumerFactory extends StreamConsumerFactory {
}
@Override
- public List<PartitionGroupMetadata> getPartitionGroupMetadataList(
- List<PartitionGroupMetadata> currentPartitionGroupsMetadata) {
- return null;
- }
-
- @Override
- public PartitionGroupConsumer createConsumer(PartitionGroupMetadata metadata) {
+ public PartitionGroupConsumer createPartitionGroupConsumer(PartitionGroupMetadata metadata) {
return null;
}
}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaPartitionGroupMetadata.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaPartitionGroupMetadata.java
new file mode 100644
index 0000000..1d792ac
--- /dev/null
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaPartitionGroupMetadata.java
@@ -0,0 +1,48 @@
+package org.apache.pinot.plugin.stream.kafka09;
+
+import org.apache.pinot.spi.stream.Checkpoint;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
+
+
+public class KafkaPartitionGroupMetadata implements PartitionGroupMetadata {
+
+ private final int _groupId;
+ public KafkaPartitionGroupMetadata(int partitionId) {
+ _groupId = partitionId;
+ }
+
+ @Override
+ public int getGroupId() {
+ return _groupId;
+ }
+
+ @Override
+ public Checkpoint getStartCheckpoint() {
+ return null;
+ }
+
+ @Override
+ public Checkpoint getEndCheckpoint() {
+ return null;
+ }
+
+ @Override
+ public void setStartCheckpoint(Checkpoint startCheckpoint) {
+
+ }
+
+ @Override
+ public void setEndCheckpoint(Checkpoint endCheckpoint) {
+
+ }
+
+ @Override
+ public byte[] serialize() {
+ return new byte[0];
+ }
+
+ @Override
+ public PartitionGroupMetadata deserialize(byte[] blob) {
+ return null;
+ }
+}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaStreamMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaStreamMetadataProvider.java
index 06ee697..865ae96 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaStreamMetadataProvider.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaStreamMetadataProvider.java
@@ -22,9 +22,13 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Uninterruptibles;
import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.List;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.TopicAndPartition;
import kafka.javaapi.OffsetRequest;
@@ -36,6 +40,7 @@ import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.protocol.Errors;
import org.apache.pinot.spi.stream.LongMsgOffset;
import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamMetadataProvider;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
@@ -84,7 +89,12 @@ public class KafkaStreamMetadataProvider extends KafkaConnectionHandler implemen
* @return
*/
@Override
+ @Deprecated
public synchronized int fetchPartitionCount(long timeoutMillis) {
+ return fetchPartitionCountInternal(timeoutMillis);
+ }
+
+ private int fetchPartitionCountInternal(long timeoutMillis) {
int unknownTopicReplyCount = 0;
final int MAX_UNKNOWN_TOPIC_REPLY_COUNT = 10;
int kafkaErrorCount = 0;
@@ -145,6 +155,22 @@ public class KafkaStreamMetadataProvider extends KafkaConnectionHandler implemen
throw new TimeoutException();
}
+ /**
+ * Fetch the partition group metadata list
+ * @param currentPartitionGroupsMetadata In case of Kafka, each partition group contains a single partition.
+ * Hence current partition groups are not needed to compute the new partition groups
+ */
+ @Override
+ public List<PartitionGroupMetadata> getPartitionGroupMetadataList(
+ @Nullable List<PartitionGroupMetadata> currentPartitionGroupsMetadata, long timeoutMillis) {
+ int partitionCount = fetchPartitionCountInternal(timeoutMillis);
+ List<PartitionGroupMetadata> partitionGroupMetadataList = new ArrayList<>(partitionCount);
+ for (int i = 0; i < partitionCount; i++) {
+ partitionGroupMetadataList.add(new KafkaPartitionGroupMetadata(i));
+ }
+ return partitionGroupMetadataList;
+ }
+
public synchronized long fetchPartitionOffset(@Nonnull OffsetCriteria offsetCriteria, long timeoutMillis)
throws java.util.concurrent.TimeoutException {
throw new UnsupportedOperationException("The use of this method s not supported");
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/test/java/org/apache/pinot/plugin/stream/kafka09/KafkaPartitionLevelConsumerTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/test/java/org/apache/pinot/plugin/stream/kafka09/KafkaPartitionLevelConsumerTest.java
index beb82e5..fbdfdfb 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/test/java/org/apache/pinot/plugin/stream/kafka09/KafkaPartitionLevelConsumerTest.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/test/java/org/apache/pinot/plugin/stream/kafka09/KafkaPartitionLevelConsumerTest.java
@@ -291,7 +291,7 @@ public class KafkaPartitionLevelConsumerTest {
KafkaStreamMetadataProvider streamMetadataProvider =
new KafkaStreamMetadataProvider(clientId, streamConfig, mockKafkaSimpleConsumerFactory);
- Assert.assertEquals(streamMetadataProvider.fetchPartitionCount(10000L), 2);
+ Assert.assertEquals(streamMetadataProvider.getPartitionGroupMetadataList(null, 10000L), 2);
}
@Test
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java
index 806baff..c73aacb 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java
@@ -18,7 +18,6 @@
*/
package org.apache.pinot.plugin.stream.kafka20;
-import java.util.List;
import java.util.Set;
import org.apache.pinot.spi.stream.PartitionGroupConsumer;
import org.apache.pinot.spi.stream.PartitionGroupMetadata;
@@ -52,13 +51,7 @@ public class KafkaConsumerFactory extends StreamConsumerFactory {
}
@Override
- public List<PartitionGroupMetadata> getPartitionGroupMetadataList(
- List<PartitionGroupMetadata> currentPartitionGroupsMetadata) {
- return null;
- }
-
- @Override
- public PartitionGroupConsumer createConsumer(PartitionGroupMetadata metadata) {
+ public PartitionGroupConsumer createPartitionGroupConsumer(PartitionGroupMetadata metadata) {
return null;
}
}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionGroupMetadata.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionGroupMetadata.java
new file mode 100644
index 0000000..31ae75a
--- /dev/null
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionGroupMetadata.java
@@ -0,0 +1,48 @@
+package org.apache.pinot.plugin.stream.kafka20;
+
+import org.apache.pinot.spi.stream.Checkpoint;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
+
+
+public class KafkaPartitionGroupMetadata implements PartitionGroupMetadata {
+
+ private final int _groupId;
+ public KafkaPartitionGroupMetadata(int partitionId) {
+ _groupId = partitionId;
+ }
+
+ @Override
+ public int getGroupId() {
+ return _groupId;
+ }
+
+ @Override
+ public Checkpoint getStartCheckpoint() {
+ return null;
+ }
+
+ @Override
+ public Checkpoint getEndCheckpoint() {
+ return null;
+ }
+
+ @Override
+ public void setStartCheckpoint(Checkpoint startCheckpoint) {
+
+ }
+
+ @Override
+ public void setEndCheckpoint(Checkpoint endCheckpoint) {
+
+ }
+
+ @Override
+ public byte[] serialize() {
+ return new byte[0];
+ }
+
+ @Override
+ public PartitionGroupMetadata deserialize(byte[] blob) {
+ return null;
+ }
+}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
index c0e2041..187c61b 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
@@ -21,11 +21,15 @@ package org.apache.pinot.plugin.stream.kafka20;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.time.Duration;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.List;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import org.apache.pinot.spi.stream.LongMsgOffset;
import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamMetadataProvider;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
@@ -42,10 +46,27 @@ public class KafkaStreamMetadataProvider extends KafkaPartitionLevelConnectionHa
}
@Override
+ @Deprecated
public int fetchPartitionCount(long timeoutMillis) {
return _consumer.partitionsFor(_topic, Duration.ofMillis(timeoutMillis)).size();
}
+ /**
+ * Fetch the partitionGroupMetadata list.
+ * @param currentPartitionGroupsMetadata In case of Kafka, each partition group contains a single partition.
+ * Hence current partition groups are not needed to compute the new partition groups
+ */
+ @Override
+ public List<PartitionGroupMetadata> getPartitionGroupMetadataList(
+ @Nullable List<PartitionGroupMetadata> currentPartitionGroupsMetadata, long timeoutMillis) {
+ int partitionCount = _consumer.partitionsFor(_topic, Duration.ofMillis(timeoutMillis)).size();
+ List<PartitionGroupMetadata> partitionGroupMetadataList = new ArrayList<>(partitionCount);
+ for (int i = 0; i < partitionCount; i++) {
+ partitionGroupMetadataList.add(new KafkaPartitionGroupMetadata(i));
+ }
+ return partitionGroupMetadataList;
+ }
+
public synchronized long fetchPartitionOffset(@Nonnull OffsetCriteria offsetCriteria, long timeoutMillis)
throws java.util.concurrent.TimeoutException {
throw new UnsupportedOperationException("The use of this method is not supported");
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionCountFetcher.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
similarity index 74%
rename from pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionCountFetcher.java
rename to pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
index d523235..e1ce1a6 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionCountFetcher.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.spi.stream;
+import java.util.List;
import java.util.concurrent.Callable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -26,24 +27,27 @@ import org.slf4j.LoggerFactory;
/**
* Fetches the partition count of a stream using the {@link StreamMetadataProvider}
*/
-public class PartitionCountFetcher implements Callable<Boolean> {
+public class PartitionGroupMetadataFetcher implements Callable<Boolean> {
- private static final Logger LOGGER = LoggerFactory.getLogger(PartitionCountFetcher.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(PartitionGroupMetadataFetcher.class);
private int _partitionCount = -1;
+ private List<PartitionGroupMetadata> _partitionGroupMetadataList;
+ private List<PartitionGroupMetadata> _currentPartitionGroupMetadata;
private final StreamConfig _streamConfig;
private StreamConsumerFactory _streamConsumerFactory;
private Exception _exception;
private final String _topicName;
- public PartitionCountFetcher(StreamConfig streamConfig) {
+ public PartitionGroupMetadataFetcher(StreamConfig streamConfig, List<PartitionGroupMetadata> currentPartitionGroupMetadataList) {
_streamConfig = streamConfig;
_streamConsumerFactory = StreamConsumerFactoryProvider.create(_streamConfig);
_topicName = streamConfig.getTopicName();
+ _currentPartitionGroupMetadata = currentPartitionGroupMetadataList;
}
- public int getPartitionCount() {
- return _partitionCount;
+ public List<PartitionGroupMetadata> getPartitionGroupMetadataList() {
+ return _partitionGroupMetadataList;
}
public Exception getException() {
@@ -59,10 +63,10 @@ public class PartitionCountFetcher implements Callable<Boolean> {
public Boolean call()
throws Exception {
- String clientId = PartitionCountFetcher.class.getSimpleName() + "-" + _topicName;
+ String clientId = PartitionGroupMetadataFetcher.class.getSimpleName() + "-" + _topicName;
try (
StreamMetadataProvider streamMetadataProvider = _streamConsumerFactory.createStreamMetadataProvider(clientId)) {
- _partitionCount = streamMetadataProvider.fetchPartitionCount(/*maxWaitTimeMs=*/5000L);
+ _partitionGroupMetadataList = streamMetadataProvider.getPartitionGroupMetadataList(_currentPartitionGroupMetadata, /*maxWaitTimeMs=*/5000L);
if (_exception != null) {
// We had at least one failure, but succeeded now. Log an info
LOGGER.info("Successfully retrieved partition count as {} for topic {}", _partitionCount, _topicName);
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionOffsetFetcher.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionOffsetFetcher.java
index 1d50160..b92f04d 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionOffsetFetcher.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionOffsetFetcher.java
@@ -33,16 +33,16 @@ public class PartitionOffsetFetcher implements Callable<Boolean> {
private final String _topicName;
private final OffsetCriteria _offsetCriteria;
- private final int _partitionId;
+ private final int _partitionGroupId;
private Exception _exception = null;
private StreamPartitionMsgOffset _offset;
private StreamConsumerFactory _streamConsumerFactory;
StreamConfig _streamConfig;
- public PartitionOffsetFetcher(final OffsetCriteria offsetCriteria, int partitionId, StreamConfig streamConfig) {
+ public PartitionOffsetFetcher(final OffsetCriteria offsetCriteria, int partitionGroupId, StreamConfig streamConfig) {
_offsetCriteria = offsetCriteria;
- _partitionId = partitionId;
+ _partitionGroupId = partitionGroupId;
_streamConfig = streamConfig;
_streamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig);
_topicName = streamConfig.getTopicName();
@@ -64,18 +64,19 @@ public class PartitionOffsetFetcher implements Callable<Boolean> {
@Override
public Boolean call()
throws Exception {
- String clientId = PartitionOffsetFetcher.class.getSimpleName() + "-" + _topicName + "-" + _partitionId;
+ String clientId = PartitionOffsetFetcher.class.getSimpleName() + "-" + _topicName + "-" + _partitionGroupId;
try (StreamMetadataProvider streamMetadataProvider = _streamConsumerFactory
- .createPartitionMetadataProvider(clientId, _partitionId)) {
+ .createPartitionMetadataProvider(clientId, _partitionGroupId)) {
_offset =
streamMetadataProvider.fetchStreamPartitionOffset(_offsetCriteria, STREAM_PARTITION_OFFSET_FETCH_TIMEOUT_MILLIS);
if (_exception != null) {
LOGGER.info("Successfully retrieved offset({}) for stream topic {} partition {}", _offset, _topicName,
- _partitionId);
+ _partitionGroupId);
}
return Boolean.TRUE;
} catch (TransientConsumerException e) {
- LOGGER.warn("Temporary exception when fetching offset for topic {} partition {}:{}", _topicName, _partitionId,
+ LOGGER.warn("Temporary exception when fetching offset for topic {} partition {}:{}", _topicName,
+ _partitionGroupId,
e.getMessage());
_exception = e;
return Boolean.FALSE;
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
index 4db0fb1..9caf61b 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
@@ -18,7 +18,6 @@
*/
package org.apache.pinot.spi.stream;
-import java.util.List;
import java.util.Set;
@@ -42,6 +41,7 @@ public abstract class StreamConsumerFactory {
* @param partition the partition id of the partition for which this consumer is being created
* @return
*/
+ @Deprecated
public abstract PartitionLevelConsumer createPartitionLevelConsumer(String clientId, int partition);
/**
@@ -74,10 +74,6 @@ public abstract class StreamConsumerFactory {
return new LongMsgOffsetFactory();
}
- // takes the current state of partition groups (groupings of shards, the state of the consumption) and creates the new state
- public abstract List<PartitionGroupMetadata> getPartitionGroupMetadataList(
- List<PartitionGroupMetadata> currentPartitionGroupsMetadata);
-
// creates a consumer which consumes from a partition group
- public abstract PartitionGroupConsumer createConsumer(PartitionGroupMetadata metadata);
+ public abstract PartitionGroupConsumer createPartitionGroupConsumer(PartitionGroupMetadata metadata);
}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
index 557ffc4..5b9104e 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
@@ -19,6 +19,7 @@
package org.apache.pinot.spi.stream;
import java.io.Closeable;
+import java.util.List;
import javax.annotation.Nonnull;
import org.apache.pinot.spi.annotations.InterfaceAudience;
import org.apache.pinot.spi.annotations.InterfaceStability;
@@ -32,11 +33,15 @@ import org.apache.pinot.spi.annotations.InterfaceStability;
public interface StreamMetadataProvider extends Closeable {
/**
* Fetches the number of partitions for a topic given the stream configs
- * @param timeoutMillis
- * @return
+ * @deprecated use getPartitionGroupMetadataList instead
*/
+ @Deprecated
int fetchPartitionCount(long timeoutMillis);
+ // takes the current state of partition groups (groupings of shards, the state of the consumption) and creates the new state
+ List<PartitionGroupMetadata> getPartitionGroupMetadataList(
+ List<PartitionGroupMetadata> currentPartitionGroupsMetadata, long timeoutMillis);
+
// Issue 5953 Retain this interface for 0.5.0, remove in 0.6.0
@Deprecated
long fetchPartitionOffset(@Nonnull OffsetCriteria offsetCriteria, long timeoutMillis)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org