You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2024/02/01 22:13:45 UTC

(pinot) branch master updated: Work around the problem of Helix sending 2 transitions for CONSUMING -> DROPPED (#12351)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 33074e1e7e Work around the problem of Helix sending 2 transitions for CONSUMING -> DROPPED (#12351)
33074e1e7e is described below

commit 33074e1e7ec807251c8163d1807c61f5f80e8853
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Thu Feb 1 14:13:39 2024 -0800

    Work around the problem of Helix sending 2 transitions for CONSUMING -> DROPPED (#12351)
---
 .../SegmentOnlineOfflineStateModelFactory.java     | 40 ++++++++++++++++++++--
 1 file changed, 37 insertions(+), 3 deletions(-)

diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
index 42d1642c7b..b0625c879b 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
@@ -19,6 +19,10 @@
 package org.apache.pinot.server.starter.helix;
 
 import com.google.common.base.Preconditions;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.model.Message;
 import org.apache.helix.participant.statemachine.StateModel;
@@ -46,6 +50,13 @@ import org.slf4j.LoggerFactory;
  * 3. Delete an existed segment.
  */
 public class SegmentOnlineOfflineStateModelFactory extends StateModelFactory<StateModel> {
+  // NOTE: Helix might process CONSUMING -> DROPPED transition as 2 separate transitions: CONSUMING -> OFFLINE followed
+  // by OFFLINE -> DROPPED. Use this cache to track the segments that just went through CONSUMING -> OFFLINE transition
+  // to detect CONSUMING -> DROPPED transition.
+  // TODO: Check how Helix handle CONSUMING -> DROPPED transition and remove this cache if it's not needed.
+  private final Cache<Pair<String, String>, Boolean> _recentlyOffloadedConsumingSegments =
+      CacheBuilder.newBuilder().expireAfterWrite(10, TimeUnit.MINUTES).build();
+
   private final String _instanceId;
   private final InstanceDataManager _instanceDataManager;
 
@@ -139,6 +150,7 @@ public class SegmentOnlineOfflineStateModelFactory extends StateModelFactory<Sta
       String segmentName = message.getPartitionName();
       try {
         _instanceDataManager.offloadSegment(realtimeTableName, segmentName);
+        _recentlyOffloadedConsumingSegments.put(Pair.of(realtimeTableName, segmentName), true);
       } catch (Exception e) {
         _logger.error("Caught exception in state transition CONSUMING -> OFFLINE for table: {}, segment: {}",
             realtimeTableName, segmentName, e);
@@ -151,12 +163,10 @@ public class SegmentOnlineOfflineStateModelFactory extends StateModelFactory<Sta
       _logger.info("SegmentOnlineOfflineStateModel.onBecomeDroppedFromConsuming() : " + message);
       String realtimeTableName = message.getResourceName();
       String segmentName = message.getPartitionName();
-      TableDataManager tableDataManager = _instanceDataManager.getTableDataManager(realtimeTableName);
-      Preconditions.checkState(tableDataManager != null, "Failed to find table: %s", realtimeTableName);
-      tableDataManager.onConsumingToDropped(segmentName);
       try {
         _instanceDataManager.offloadSegment(realtimeTableName, segmentName);
         _instanceDataManager.deleteSegment(realtimeTableName, segmentName);
+        onConsumingToDropped(realtimeTableName, segmentName);
       } catch (Exception e) {
         _logger.error("Caught exception in state transition CONSUMING -> DROPPED for table: {}, segment: {}",
             realtimeTableName, segmentName, e);
@@ -164,6 +174,21 @@ public class SegmentOnlineOfflineStateModelFactory extends StateModelFactory<Sta
       }
     }
 
+    /**
+     * Should be invoked after segment is offloaded and deleted so that it can safely release the resources from table
+     * data manager.
+     */
+    private void onConsumingToDropped(String realtimeTableName, String segmentName) {
+      TableDataManager tableDataManager = _instanceDataManager.getTableDataManager(realtimeTableName);
+      if (tableDataManager == null) {
+        _logger.warn(
+            "Failed to find data manager for table: {}, skip invoking consuming to dropped callback for segment: {}",
+            realtimeTableName, segmentName);
+        return;
+      }
+      tableDataManager.onConsumingToDropped(segmentName);
+    }
+
     @Transition(from = "OFFLINE", to = "ONLINE")
     public void onBecomeOnlineFromOffline(Message message, NotificationContext context) {
       _logger.info("SegmentOnlineOfflineStateModel.onBecomeOnlineFromOffline() : " + message);
@@ -215,6 +240,15 @@ public class SegmentOnlineOfflineStateModelFactory extends StateModelFactory<Sta
       String segmentName = message.getPartitionName();
       try {
         _instanceDataManager.deleteSegment(tableNameWithType, segmentName);
+
+        // Check if the segment is recently offloaded from CONSUMING to OFFLINE
+        if (TableNameBuilder.isRealtimeTableResource(tableNameWithType)) {
+          Pair<String, String> tableSegmentPair = Pair.of(tableNameWithType, segmentName);
+          if (_recentlyOffloadedConsumingSegments.getIfPresent(tableSegmentPair) != null) {
+            _recentlyOffloadedConsumingSegments.invalidate(tableSegmentPair);
+            onConsumingToDropped(tableNameWithType, segmentName);
+          }
+        }
       } catch (Exception e) {
         _logger.error("Caught exception in state transition OFFLINE -> DROPPED for table: {}, segment: {}",
             tableNameWithType, segmentName, e);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org