You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2019/04/24 22:39:59 UTC

[incubator-pinot] branch master updated: Add multiple locks which can be obtained based on segment name in lookupOrCreateFSM (#4159)

This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 8464997  Add multiple locks which can be obtained based on segment name in lookupOrCreateFSM (#4159)
8464997 is described below

commit 8464997626272559ae9822045f3ea7bd5cef5785
Author: Neha Pawar <np...@linkedin.com>
AuthorDate: Wed Apr 24 15:39:54 2019 -0700

    Add multiple locks which can be obtained based on segment name in lookupOrCreateFSM (#4159)
---
 .../core/realtime/SegmentCompletionManager.java    | 49 +++++++++++++++-------
 1 file changed, 33 insertions(+), 16 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
index cfdd3c3..2613570 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
@@ -19,12 +19,15 @@
 package org.apache.pinot.controller.helix.core.realtime;
 
 import com.google.common.annotations.VisibleForTesting;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 import org.apache.helix.HelixManager;
 import org.apache.helix.ZNRecord;
 import org.apache.pinot.common.config.TableNameBuilder;
@@ -71,6 +74,8 @@ public class SegmentCompletionManager {
   private final PinotLLCRealtimeSegmentManager _segmentManager;
   private final ControllerMetrics _controllerMetrics;
   private final ControllerLeadershipManager _controllerLeadershipManager;
+  private final Lock[] _fsmLocks;
+  private static final int NUM_FSM_LOCKS = 20;
 
   // Half hour max commit time for all segments
   private static final int MAX_COMMIT_TIME_FOR_ALL_SEGMENTS_SECONDS = 1800;
@@ -90,6 +95,10 @@ public class SegmentCompletionManager {
     _controllerLeadershipManager = controllerLeadershipManager;
     SegmentCompletionProtocol
         .setMaxSegmentCommitTimeMs(TimeUnit.MILLISECONDS.convert(segmentCommitTimeoutSeconds, TimeUnit.SECONDS));
+    _fsmLocks = new Lock[NUM_FSM_LOCKS];
+    for (int i = 0; i < NUM_FSM_LOCKS; i ++) {
+      _fsmLocks[i] = new ReentrantLock();
+    }
   }
 
   public boolean isSplitCommitEnabled() {
@@ -104,14 +113,20 @@ public class SegmentCompletionManager {
     return System.currentTimeMillis();
   }
 
-  // We need to make sure that we never create multiple FSMs for the same segment, so this method must be synchronized.
-  private synchronized SegmentCompletionFSM lookupOrCreateFsm(final LLCSegmentName segmentName, String msgType) {
+  // We need to make sure that we never create multiple FSMs for the same segment
+  // Obtain locks based on segment name, so as to disallow same segment names entering together
+  private SegmentCompletionFSM lookupOrCreateFsm(final LLCSegmentName segmentName, String msgType) {
     final String segmentNameStr = segmentName.getSegmentName();
-    SegmentCompletionFSM fsm = _fsmMap.get(segmentNameStr);
-    if (fsm == null) {
-      // Look up propertystore to see if this is a completed segment
-      ZNRecord segment;
-      try {
+
+    int lockIndex = (segmentNameStr.hashCode() & Integer.MAX_VALUE) % NUM_FSM_LOCKS;
+    Lock lock = _fsmLocks[lockIndex];
+
+    SegmentCompletionFSM fsm;
+    try {
+      lock.lock();
+      fsm = _fsmMap.get(segmentNameStr);
+      if (fsm == null) {
+        // Look up propertystore to see if this is a completed segment
         // TODO if we keep a list of last few committed segments, we don't need to go to zk for this.
         final String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(segmentName.getTableName());
         LLCRealtimeSegmentZKMetadata segmentMetadata =
@@ -121,22 +136,24 @@ public class SegmentCompletionManager {
           // Also good for synchronization, because it is possible that multiple threads take this path, and we don't want
           // multiple instances of the FSM to be created for the same commit sequence at the same time.
           final long endOffset = segmentMetadata.getEndOffset();
-          fsm = SegmentCompletionFSM
-              .fsmInCommit(_segmentManager, this, segmentName, segmentMetadata.getNumReplicas(), endOffset);
+          fsm = SegmentCompletionFSM.fsmInCommit(_segmentManager, this, segmentName, segmentMetadata.getNumReplicas(),
+              endOffset);
         } else if (msgType.equals(SegmentCompletionProtocol.MSG_TYPE_STOPPED_CONSUMING)) {
-          fsm = SegmentCompletionFSM
-              .fsmStoppedConsuming(_segmentManager, this, segmentName, segmentMetadata.getNumReplicas());
+          fsm = SegmentCompletionFSM.fsmStoppedConsuming(_segmentManager, this, segmentName,
+              segmentMetadata.getNumReplicas());
         } else {
           // Segment is in the process of completing, and this is the first one to respond. Create fsm
           fsm = SegmentCompletionFSM.fsmInHolding(_segmentManager, this, segmentName, segmentMetadata.getNumReplicas());
         }
         LOGGER.info("Created FSM {}", fsm);
         _fsmMap.put(segmentNameStr, fsm);
-      } catch (Exception e) {
-        // Server gone wonky. Segment does not exist in propstore
-        LOGGER.error("Exception creating FSM for segment {}", segmentNameStr, e);
-        throw new RuntimeException("Exception creating FSM for segment " + segmentNameStr, e);
       }
+    } catch (Exception e) {
+      // Server gone wonky. Segment does not exist in propstore
+      LOGGER.error("Exception getting FSM for segment {}", segmentNameStr, e);
+      throw new RuntimeException("Exception getting FSM for segment " + segmentNameStr, e);
+    } finally {
+      lock.unlock();
     }
     return fsm;
   }
@@ -162,7 +179,7 @@ public class SegmentCompletionManager {
       fsm = lookupOrCreateFsm(segmentName, SegmentCompletionProtocol.MSG_TYPE_CONSUMED);
       response = fsm.segmentConsumed(instanceId, offset, stopReason);
     } catch (Exception e) {
-      // Return failed response
+      LOGGER.error("Caught exception in segmentConsumed for segment {}", segmentNameStr, e);
     }
     if (fsm != null && fsm.isDone()) {
       LOGGER.info("Removing FSM (if present):{}", fsm.toString());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org