You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2021/02/06 00:59:29 UTC

[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #6518: Kinesis Connector

mcvsubbu commented on a change in pull request #6518:
URL: https://github.com/apache/incubator-pinot/pull/6518#discussion_r571326647



##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -449,13 +497,40 @@ private void commitSegmentMetadataInternal(String realtimeTableName,
     _helixResourceManager.sendSegmentRefreshMessage(realtimeTableName, committingSegmentName, false, true);
 
     // Step-2
+
+    // Example: Say we currently were consuming from 2 shards A, B. Of those, A is the one committing.
+
+    // Get current partition groups - this gives current state of latest segments for each partition [A - DONE], [B - IN_PROGRESS]
+    PartitionLevelStreamConfig streamConfig = new PartitionLevelStreamConfig(tableConfig.getTableName(),
+        IngestionConfigUtils.getStreamConfigMap(tableConfig));
+    List<PartitionGroupMetadata> currentPartitionGroupMetadataList =
+        getCurrentPartitionGroupMetadataList(idealState, streamConfig);
+
+    // Find new partition groups [A],[B],[C],[D] (assume A split into C D)
+    // If segment has consumed all of A, we will receive B,C,D
+    // If segment is still not reached last msg of A, we will receive A,B,C,D
+    // If there were no splits/merges we would receive A,B
+    List<PartitionGroupInfo> newPartitionGroupInfoList =
+        getPartitionGroupInfoList(streamConfig, currentPartitionGroupMetadataList);
+    Set<Integer> newPartitionGroupSet =
+        newPartitionGroupInfoList.stream().map(PartitionGroupInfo::getPartitionGroupId).collect(Collectors.toSet());
+    int numPartitions = newPartitionGroupInfoList.size();
+
+    // Only if committingSegment's partitionGroup is present in the newPartitionGroupInfoList, we create new segment metadata
+    String newConsumingSegmentName = null;
+    String rawTableName = TableNameBuilder.extractRawTableName(realtimeTableName);
     long newSegmentCreationTimeMs = getCurrentTimeMs();
-    LLCSegmentName newLLCSegmentName =
-        getNextLLCSegmentName(new LLCSegmentName(committingSegmentName), newSegmentCreationTimeMs);
-    createNewSegmentZKMetadata(tableConfig,
-        new PartitionLevelStreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig)),
-        newLLCSegmentName, newSegmentCreationTimeMs, committingSegmentDescriptor, committingSegmentZKMetadata,
-        instancePartitions, numPartitions, numReplicas);
+    if (newPartitionGroupSet.contains(committingSegmentPartitionGroupId)) {
+      LLCSegmentName newLLCSegment = new LLCSegmentName(rawTableName, committingSegmentPartitionGroupId,
+          committingLLCSegment.getSequenceNumber() + 1, newSegmentCreationTimeMs);
+      createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegment, newSegmentCreationTimeMs,
+          committingSegmentDescriptor, committingSegmentZKMetadata, instancePartitions, numPartitions, numReplicas);
+      newConsumingSegmentName = newLLCSegment.getSegmentName();
+    }
+
+    // TODO: Also, create new partition groups here (instead of waiting for the Validation Manager)
+    //  Cannot do it at the moment, because of the timestamp suffix on the segment name.

Review comment:
       I fixed that by adding a check in the idealstate. Only one partition group and seq number combination allowed.

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
##########
@@ -149,22 +151,22 @@ public boolean isFinal() {
   public class SegmentBuildDescriptor {
     final File _segmentTarFile;
     final Map<String, File> _metadataFileMap;
-    final StreamPartitionMsgOffset _offset;
+    final Checkpoint _offset;

Review comment:
       Why rename this to Checkpoint?

##########
File path: pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentName.java
##########
@@ -63,7 +63,7 @@ public String getGroupId() {
     throw new RuntimeException("No groupId in " + getSegmentName());
   }
 
-  public int getPartitionId() {
+  public int getPartitionGroupId() {

Review comment:
       nit: Please change the exception message as well

##########
File path: pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/LLCRealtimeSegmentZKMetadata.java
##########
@@ -87,11 +87,6 @@ public void setDownloadUrl(String downloadUrl) {
   public ZNRecord toZNRecord() {
     ZNRecord znRecord = super.toZNRecord();
     znRecord.setSimpleField(START_OFFSET, _startOffset);
-    if (_endOffset == null) {

Review comment:
       Please flag the commit for release notes and add in checkin comments that in order for people to use this release, all components should have been upgraded to the previous release (or, 0.5.0, not sure. please verify). thanks




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org