You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by mc...@apache.org on 2021/11/12 23:04:37 UTC

[pinot] branch master updated: Use oldest offset on newly detected partitions (#7756)

This is an automated email from the ASF dual-hosted git repository.

mcvsubbu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 561b8a3  Use oldest offset on newly detected partitions (#7756)
561b8a3 is described below

commit 561b8a3c6fa832be3ab9e540ca3688a38a11e4c8
Author: Subbu Subramaniam <mc...@users.noreply.github.com>
AuthorDate: Fri Nov 12 15:04:18 2021 -0800

    Use oldest offset on newly detected partitions (#7756)
    
    * Use oldest offset on newly detected partitions
    
    Fixed the issue where Pinot was losing data when it detected new stream partitions
    (depending on table configuration).
    
    Manual testing by running LLCRealtimeClusterIntegrationTest via debugger
    - Change the table config to start from largest offset
    - Force the test to detect only one partition, notice that roughly
      half the rows are ingested, and only partiton 0 shows up in idealstate
    - Run the RealtimeSegmentValidationManager job via swagger to force detection
      of new partition.
    - Confirmed that all rows are now present.
    
    Issue #7741
    
    * Fixed linter error
---
 .../core/realtime/PinotLLCRealtimeSegmentManager.java   | 17 ++++++++++++-----
 1 file changed, 12 insertions(+), 5 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 7f6996a..16c7471 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -316,7 +316,7 @@ public class PinotLLCRealtimeSegmentManager {
     for (PartitionGroupMetadata partitionGroupMetadata : newPartitionGroupMetadataList) {
       String segmentName =
           setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupMetadata, currentTimeMs, instancePartitions,
-              numPartitionGroups, numReplicas, newPartitionGroupMetadataList);
+              numPartitionGroups, numReplicas, newPartitionGroupMetadataList, false);
 
       updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, segmentName, segmentAssignment,
           instancePartitionsMap);
@@ -1193,7 +1193,7 @@ public class PinotLLCRealtimeSegmentManager {
       if (!latestSegmentZKMetadataMap.containsKey(partitionGroupId)) {
         String newSegmentName =
             setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupMetadata, currentTimeMs, instancePartitions,
-                numPartitions, numReplicas, newPartitionGroupMetadataList);
+                numPartitions, numReplicas, newPartitionGroupMetadataList, true);
         updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, newSegmentName, segmentAssignment,
             instancePartitionsMap);
       }
@@ -1232,10 +1232,16 @@ public class PinotLLCRealtimeSegmentManager {
    */
   private String setupNewPartitionGroup(TableConfig tableConfig, PartitionLevelStreamConfig streamConfig,
       PartitionGroupMetadata partitionGroupMetadata, long creationTimeMs, InstancePartitions instancePartitions,
-      int numPartitionGroups, int numReplicas, List<PartitionGroupMetadata> partitionGroupMetadataList) {
+      int numPartitionGroups, int numReplicas, List<PartitionGroupMetadata> partitionGroupMetadataList,
+      boolean isLiveTable) {
     String realtimeTableName = tableConfig.getTableName();
     int partitionGroupId = partitionGroupMetadata.getPartitionGroupId();
-    String startOffset = partitionGroupMetadata.getStartOffset().toString();
+    StreamPartitionMsgOffset startOffset;
+    if (isLiveTable) {
+      startOffset = getPartitionGroupSmallestOffset(streamConfig, partitionGroupId);
+    } else {
+      startOffset = partitionGroupMetadata.getStartOffset();
+    }
     LOGGER.info("Setting up new partition group: {} for table: {}", partitionGroupId, realtimeTableName);
 
     String rawTableName = TableNameBuilder.extractRawTableName(realtimeTableName);
@@ -1243,7 +1249,8 @@ public class PinotLLCRealtimeSegmentManager {
         new LLCSegmentName(rawTableName, partitionGroupId, STARTING_SEQUENCE_NUMBER, creationTimeMs);
     String newSegmentName = newLLCSegmentName.getSegmentName();
 
-    CommittingSegmentDescriptor committingSegmentDescriptor = new CommittingSegmentDescriptor(null, startOffset, 0);
+    CommittingSegmentDescriptor committingSegmentDescriptor = new CommittingSegmentDescriptor(null,
+        startOffset.toString(), 0);
     createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, creationTimeMs,
         committingSegmentDescriptor, null, instancePartitions, numPartitionGroups, numReplicas,
         partitionGroupMetadataList);

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org