You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2021/01/08 01:42:40 UTC
[incubator-pinot] branch sharded_consumer_type_support_with_kinesis
updated: Remove new partition groups creation in commit
This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/sharded_consumer_type_support_with_kinesis by this push:
new f39dbb7 Remove new partition groups creation in commit
f39dbb7 is described below
commit f39dbb71d1981b3265614fce17133b11675c7cfe
Author: Neha Pawar <ne...@gmail.com>
AuthorDate: Thu Jan 7 17:42:23 2021 -0800
Remove new partition groups creation in commit
---
.../realtime/PinotLLCRealtimeSegmentManager.java | 57 +++++++++-------------
.../realtime/LLRealtimeSegmentDataManager.java | 3 +-
2 files changed, 23 insertions(+), 37 deletions(-)
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 8bf9cd0..5fd5c3f 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -469,6 +469,8 @@ public class PinotLLCRealtimeSegmentManager {
private void commitSegmentMetadataInternal(String realtimeTableName,
CommittingSegmentDescriptor committingSegmentDescriptor) {
String committingSegmentName = committingSegmentDescriptor.getSegmentName();
+ LLCSegmentName committingLLCSegment = new LLCSegmentName(committingSegmentName);
+ int committingSegmentPartitionGroupId = committingLLCSegment.getPartitionGroupId();
LOGGER.info("Committing segment metadata for segment: {}", committingSegmentName);
TableConfig tableConfig = getTableConfig(realtimeTableName);
@@ -495,51 +497,40 @@ public class PinotLLCRealtimeSegmentManager {
// Step-2
- // Say we currently were consuming from 2 shards A, B. Of those, A is the one committing.
+ // Example: Say we currently were consuming from 2 shards A, B. Of those, A is the one committing.
- // get current partition groups - this gives current state of latest segments for each partition [A - DONE], [B - IN_PROGRESS]
+ // Get current partition groups - this gives current state of latest segments for each partition [A - DONE], [B - IN_PROGRESS]
List<PartitionGroupMetadata> currentPartitionGroupMetadataList = getCurrentPartitionGroupMetadataList(idealState);
PartitionLevelStreamConfig streamConfig = new PartitionLevelStreamConfig(tableConfig.getTableName(),
IngestionConfigUtils.getStreamConfigMap(tableConfig));
- // find new partition groups [A],[B],[C],[D] (assume A split into C D)
+ // Find new partition groups [A],[B],[C],[D] (assume A split into C D)
// If segment has consumed all of A, we will receive B,C,D
// If segment is still not reached last msg of A, we will receive A,B,C,D
+ // If there were no splits/merges we would receive A,B
List<PartitionGroupInfo> newPartitionGroupInfoList =
getPartitionGroupInfoList(streamConfig, currentPartitionGroupMetadataList);
int numPartitions = newPartitionGroupInfoList.size();
- // create new segment metadata, only if PartitionGroupInfo was returned for it in the newPartitionGroupInfoList
- Map<Integer, PartitionGroupMetadata> currentGroupIdToMetadata = currentPartitionGroupMetadataList.stream().collect(
- Collectors.toMap(PartitionGroupMetadata::getPartitionGroupId, p -> p));
-
- List<String> newConsumingSegmentNames = new ArrayList<>();
+ // Only if committingSegment's partitionGroup is present in the newPartitionGroupInfoList, we create new segment metadata
+ String newConsumingSegmentName = null;
String rawTableName = TableNameBuilder.extractRawTableName(realtimeTableName);
long newSegmentCreationTimeMs = getCurrentTimeMs();
for (PartitionGroupInfo partitionGroupInfo : newPartitionGroupInfoList) {
- int newPartitionGroupId = partitionGroupInfo.getPartitionGroupId();
- PartitionGroupMetadata currentPartitionGroupMetadata = currentGroupIdToMetadata.get(newPartitionGroupId);
- if (currentPartitionGroupMetadata == null) { // not present in current state. New partition found.
- // make new segment
- // fixme: letting validation manager do this would be best, otherwise we risk creating multiple CONSUMING segments
- String newLLCSegmentName =
- setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupInfo, newSegmentCreationTimeMs,
- instancePartitions, numPartitions, numReplicas);
- newConsumingSegmentNames.add(newLLCSegmentName);
- } else {
- LLCSegmentName committingLLCSegment = new LLCSegmentName(committingSegmentName);
- // Update this only for committing segment. All other partitions should get updated by their own commit call
- if (newPartitionGroupId == committingLLCSegment.getPartitionGroupId()) {
- Preconditions.checkState(currentPartitionGroupMetadata.getStatus().equals(Status.DONE.toString()));
- LLCSegmentName newLLCSegmentName = new LLCSegmentName(rawTableName, newPartitionGroupId,
- currentPartitionGroupMetadata.getSequenceNumber() + 1, newSegmentCreationTimeMs);
- createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, newSegmentCreationTimeMs,
- committingSegmentDescriptor, committingSegmentZKMetadata, instancePartitions, numPartitions, numReplicas);
- newConsumingSegmentNames.add(newLLCSegmentName.getSegmentName());
- }
+ if (partitionGroupInfo.getPartitionGroupId() == committingSegmentPartitionGroupId) {
+ LLCSegmentName newLLCSegment = new LLCSegmentName(rawTableName, committingSegmentPartitionGroupId,
+ committingLLCSegment.getSequenceNumber() + 1, newSegmentCreationTimeMs);
+ createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegment, newSegmentCreationTimeMs,
+ committingSegmentDescriptor, committingSegmentZKMetadata, instancePartitions, numPartitions, numReplicas);
+ newConsumingSegmentName = newLLCSegment.getSegmentName();
+ break;
}
}
+ // TODO: create new partition groups also here
+ // Cannot do it at the moment, because of the timestamp suffix on the segment name.
+ // Different committing segments could create a CONSUMING segment for same new partitionGroup, with different name
+
// Step-3
SegmentAssignment segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(_helixManager, tableConfig);
Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap =
@@ -554,7 +545,7 @@ public class PinotLLCRealtimeSegmentManager {
Lock lock = _idealStateUpdateLocks[lockIndex];
try {
lock.lock();
- updateIdealStateOnSegmentCompletion(realtimeTableName, committingSegmentName, newConsumingSegmentNames,
+ updateIdealStateOnSegmentCompletion(realtimeTableName, committingSegmentName, newConsumingSegmentName,
segmentAssignment, instancePartitionsMap);
} finally {
lock.unlock();
@@ -846,7 +837,7 @@ public class PinotLLCRealtimeSegmentManager {
*/
@VisibleForTesting
void updateIdealStateOnSegmentCompletion(String realtimeTableName, String committingSegmentName,
- List<String> newSegmentNames, SegmentAssignment segmentAssignment,
+ String newSegmentName, SegmentAssignment segmentAssignment,
Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) {
HelixHelper.updateIdealState(_helixManager, realtimeTableName, idealState -> {
assert idealState != null;
@@ -863,11 +854,7 @@ public class PinotLLCRealtimeSegmentManager {
"Exceeded max segment completion time for segment " + committingSegmentName);
}
updateInstanceStatesForNewConsumingSegment(idealState.getRecord().getMapFields(), committingSegmentName,
- null, segmentAssignment, instancePartitionsMap);
- for (String newSegmentName : newSegmentNames) {
- updateInstanceStatesForNewConsumingSegment(idealState.getRecord().getMapFields(), null,
- newSegmentName, segmentAssignment, instancePartitionsMap);
- }
+ newSegmentName, segmentAssignment, instancePartitionsMap);
return idealState;
}, RetryPolicies.exponentialBackoffRetryPolicy(10, 1000L, 1.2f));
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 40b49b8..1569d8e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -307,11 +307,10 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
_stopReason = SegmentCompletionProtocol.REASON_ROW_LIMIT;
return true;
} else if (_endOfPartitionGroup) {
+ // FIXME: handle numDocsIndexed == 0 case
segmentLogger.info("Stopping consumption due to end of partitionGroup reached nRows={} numRowsIndexed={}, numRowsConsumed={}",
_numRowsIndexed, _numRowsConsumed, _segmentMaxRowCount);
_stopReason = SegmentCompletionProtocol.REASON_END_OF_PARTITION_GROUP;
- // fixme: what happens if reached endOfPartitionGroup but numDocsIndexed == 0
- // If we decide to only setupNewPartitions via ValidationManager, we don't need commit on endOfShard
return true;
}
return false;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org