You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2021/01/08 01:42:40 UTC

[incubator-pinot] branch sharded_consumer_type_support_with_kinesis updated: Remove new partition groups creation in commit

This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/sharded_consumer_type_support_with_kinesis by this push:
     new f39dbb7  Remove new partition groups creation in commit
f39dbb7 is described below

commit f39dbb71d1981b3265614fce17133b11675c7cfe
Author: Neha Pawar <ne...@gmail.com>
AuthorDate: Thu Jan 7 17:42:23 2021 -0800

    Remove new partition groups creation in commit
---
 .../realtime/PinotLLCRealtimeSegmentManager.java   | 57 +++++++++-------------
 .../realtime/LLRealtimeSegmentDataManager.java     |  3 +-
 2 files changed, 23 insertions(+), 37 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 8bf9cd0..5fd5c3f 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -469,6 +469,8 @@ public class PinotLLCRealtimeSegmentManager {
   private void commitSegmentMetadataInternal(String realtimeTableName,
       CommittingSegmentDescriptor committingSegmentDescriptor) {
     String committingSegmentName = committingSegmentDescriptor.getSegmentName();
+    LLCSegmentName committingLLCSegment = new LLCSegmentName(committingSegmentName);
+    int committingSegmentPartitionGroupId = committingLLCSegment.getPartitionGroupId();
     LOGGER.info("Committing segment metadata for segment: {}", committingSegmentName);
 
     TableConfig tableConfig = getTableConfig(realtimeTableName);
@@ -495,51 +497,40 @@ public class PinotLLCRealtimeSegmentManager {
 
     // Step-2
 
-    // Say we currently were consuming from 2 shards A, B. Of those, A is the one committing.
+    // Example: Say we currently were consuming from 2 shards A, B. Of those, A is the one committing.
 
-    // get current partition groups - this gives current state of latest segments for each partition [A - DONE], [B - IN_PROGRESS]
+    // Get current partition groups - this gives current state of latest segments for each partition [A - DONE], [B - IN_PROGRESS]
     List<PartitionGroupMetadata> currentPartitionGroupMetadataList = getCurrentPartitionGroupMetadataList(idealState);
     PartitionLevelStreamConfig streamConfig = new PartitionLevelStreamConfig(tableConfig.getTableName(),
         IngestionConfigUtils.getStreamConfigMap(tableConfig));
 
-    // find new partition groups [A],[B],[C],[D] (assume A split into C D)
+    // Find new partition groups [A],[B],[C],[D] (assume A split into C D)
     // If segment has consumed all of A, we will receive B,C,D
     // If segment is still not reached last msg of A, we will receive A,B,C,D
+    // If there were no splits/merges we would receive A,B
     List<PartitionGroupInfo> newPartitionGroupInfoList =
         getPartitionGroupInfoList(streamConfig, currentPartitionGroupMetadataList);
     int numPartitions = newPartitionGroupInfoList.size();
 
-    // create new segment metadata, only if PartitionGroupInfo was returned for it in the newPartitionGroupInfoList
-    Map<Integer, PartitionGroupMetadata> currentGroupIdToMetadata = currentPartitionGroupMetadataList.stream().collect(
-        Collectors.toMap(PartitionGroupMetadata::getPartitionGroupId, p -> p));
-
-    List<String> newConsumingSegmentNames = new ArrayList<>();
+    // Only if committingSegment's partitionGroup is present in the newPartitionGroupInfoList, we create new segment metadata
+    String newConsumingSegmentName = null;
     String rawTableName = TableNameBuilder.extractRawTableName(realtimeTableName);
     long newSegmentCreationTimeMs = getCurrentTimeMs();
     for (PartitionGroupInfo partitionGroupInfo : newPartitionGroupInfoList) {
-      int newPartitionGroupId = partitionGroupInfo.getPartitionGroupId();
-      PartitionGroupMetadata currentPartitionGroupMetadata = currentGroupIdToMetadata.get(newPartitionGroupId);
-      if (currentPartitionGroupMetadata == null) { // not present in current state. New partition found.
-        // make new segment
-        // fixme: letting validation manager do this would be best, otherwise we risk creating multiple CONSUMING segments
-        String newLLCSegmentName =
-            setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupInfo, newSegmentCreationTimeMs,
-                instancePartitions, numPartitions, numReplicas);
-        newConsumingSegmentNames.add(newLLCSegmentName);
-      } else {
-        LLCSegmentName committingLLCSegment = new LLCSegmentName(committingSegmentName);
-        // Update this only for committing segment. All other partitions should get updated by their own commit call
-        if (newPartitionGroupId == committingLLCSegment.getPartitionGroupId()) {
-          Preconditions.checkState(currentPartitionGroupMetadata.getStatus().equals(Status.DONE.toString()));
-          LLCSegmentName newLLCSegmentName = new LLCSegmentName(rawTableName, newPartitionGroupId,
-              currentPartitionGroupMetadata.getSequenceNumber() + 1, newSegmentCreationTimeMs);
-          createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, newSegmentCreationTimeMs,
-              committingSegmentDescriptor, committingSegmentZKMetadata, instancePartitions, numPartitions, numReplicas);
-          newConsumingSegmentNames.add(newLLCSegmentName.getSegmentName());
-        }
+      if (partitionGroupInfo.getPartitionGroupId() == committingSegmentPartitionGroupId) {
+        LLCSegmentName newLLCSegment = new LLCSegmentName(rawTableName, committingSegmentPartitionGroupId,
+            committingLLCSegment.getSequenceNumber() + 1, newSegmentCreationTimeMs);
+        createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegment, newSegmentCreationTimeMs,
+            committingSegmentDescriptor, committingSegmentZKMetadata, instancePartitions, numPartitions, numReplicas);
+        newConsumingSegmentName = newLLCSegment.getSegmentName();
+        break;
       }
     }
 
+    // TODO: create new partition groups also here
+    //  Cannot do it at the moment, because of the timestamp suffix on the segment name.
+    //  Different committing segments could create a CONSUMING segment for same new partitionGroup, with different name
+
     // Step-3
     SegmentAssignment segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(_helixManager, tableConfig);
     Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap =
@@ -554,7 +545,7 @@ public class PinotLLCRealtimeSegmentManager {
     Lock lock = _idealStateUpdateLocks[lockIndex];
     try {
       lock.lock();
-      updateIdealStateOnSegmentCompletion(realtimeTableName, committingSegmentName, newConsumingSegmentNames,
+      updateIdealStateOnSegmentCompletion(realtimeTableName, committingSegmentName, newConsumingSegmentName,
           segmentAssignment, instancePartitionsMap);
     } finally {
       lock.unlock();
@@ -846,7 +837,7 @@ public class PinotLLCRealtimeSegmentManager {
    */
   @VisibleForTesting
   void updateIdealStateOnSegmentCompletion(String realtimeTableName, String committingSegmentName,
-      List<String> newSegmentNames, SegmentAssignment segmentAssignment,
+      String newSegmentName, SegmentAssignment segmentAssignment,
       Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) {
     HelixHelper.updateIdealState(_helixManager, realtimeTableName, idealState -> {
       assert idealState != null;
@@ -863,11 +854,7 @@ public class PinotLLCRealtimeSegmentManager {
             "Exceeded max segment completion time for segment " + committingSegmentName);
       }
       updateInstanceStatesForNewConsumingSegment(idealState.getRecord().getMapFields(), committingSegmentName,
-          null, segmentAssignment, instancePartitionsMap);
-      for (String newSegmentName : newSegmentNames) {
-        updateInstanceStatesForNewConsumingSegment(idealState.getRecord().getMapFields(), null,
-            newSegmentName, segmentAssignment, instancePartitionsMap);
-      }
+          newSegmentName, segmentAssignment, instancePartitionsMap);
       return idealState;
     }, RetryPolicies.exponentialBackoffRetryPolicy(10, 1000L, 1.2f));
   }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 40b49b8..1569d8e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -307,11 +307,10 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
           _stopReason = SegmentCompletionProtocol.REASON_ROW_LIMIT;
           return true;
         } else if (_endOfPartitionGroup) {
+          // FIXME: handle numDocsIndexed == 0 case
           segmentLogger.info("Stopping consumption due to end of partitionGroup reached nRows={} numRowsIndexed={}, numRowsConsumed={}",
               _numRowsIndexed, _numRowsConsumed, _segmentMaxRowCount);
           _stopReason = SegmentCompletionProtocol.REASON_END_OF_PARTITION_GROUP;
-          // fixme: what happens if reached endOfPartitionGroup but numDocsIndexed == 0
-          //  If we decide to only setupNewPartitions via ValidationManager, we don't need commit on endOfShard
           return true;
         }
         return false;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org