You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2019/04/24 22:39:59 UTC
[incubator-pinot] branch master updated: Add multiple locks which
can be obtained based on segment name in lookupOrCreateFSM (#4159)
This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 8464997 Add multiple locks which can be obtained based on segment name in lookupOrCreateFSM (#4159)
8464997 is described below
commit 8464997626272559ae9822045f3ea7bd5cef5785
Author: Neha Pawar <np...@linkedin.com>
AuthorDate: Wed Apr 24 15:39:54 2019 -0700
Add multiple locks which can be obtained based on segment name in lookupOrCreateFSM (#4159)
---
.../core/realtime/SegmentCompletionManager.java | 49 +++++++++++++++-------
1 file changed, 33 insertions(+), 16 deletions(-)
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
index cfdd3c3..2613570 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
@@ -19,12 +19,15 @@
package org.apache.pinot.controller.helix.core.realtime;
import com.google.common.annotations.VisibleForTesting;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.helix.HelixManager;
import org.apache.helix.ZNRecord;
import org.apache.pinot.common.config.TableNameBuilder;
@@ -71,6 +74,8 @@ public class SegmentCompletionManager {
private final PinotLLCRealtimeSegmentManager _segmentManager;
private final ControllerMetrics _controllerMetrics;
private final ControllerLeadershipManager _controllerLeadershipManager;
+ private final Lock[] _fsmLocks;
+ private static final int NUM_FSM_LOCKS = 20;
// Half hour max commit time for all segments
private static final int MAX_COMMIT_TIME_FOR_ALL_SEGMENTS_SECONDS = 1800;
@@ -90,6 +95,10 @@ public class SegmentCompletionManager {
_controllerLeadershipManager = controllerLeadershipManager;
SegmentCompletionProtocol
.setMaxSegmentCommitTimeMs(TimeUnit.MILLISECONDS.convert(segmentCommitTimeoutSeconds, TimeUnit.SECONDS));
+ _fsmLocks = new Lock[NUM_FSM_LOCKS];
+ for (int i = 0; i < NUM_FSM_LOCKS; i ++) {
+ _fsmLocks[i] = new ReentrantLock();
+ }
}
public boolean isSplitCommitEnabled() {
@@ -104,14 +113,20 @@ public class SegmentCompletionManager {
return System.currentTimeMillis();
}
- // We need to make sure that we never create multiple FSMs for the same segment, so this method must be synchronized.
- private synchronized SegmentCompletionFSM lookupOrCreateFsm(final LLCSegmentName segmentName, String msgType) {
+ // We need to make sure that we never create multiple FSMs for the same segment
+ // Obtain locks based on segment name, so as to disallow same segment names entering together
+ private SegmentCompletionFSM lookupOrCreateFsm(final LLCSegmentName segmentName, String msgType) {
final String segmentNameStr = segmentName.getSegmentName();
- SegmentCompletionFSM fsm = _fsmMap.get(segmentNameStr);
- if (fsm == null) {
- // Look up propertystore to see if this is a completed segment
- ZNRecord segment;
- try {
+
+ int lockIndex = (segmentNameStr.hashCode() & Integer.MAX_VALUE) % NUM_FSM_LOCKS;
+ Lock lock = _fsmLocks[lockIndex];
+
+ SegmentCompletionFSM fsm;
+ try {
+ lock.lock();
+ fsm = _fsmMap.get(segmentNameStr);
+ if (fsm == null) {
+ // Look up propertystore to see if this is a completed segment
// TODO if we keep a list of last few committed segments, we don't need to go to zk for this.
final String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(segmentName.getTableName());
LLCRealtimeSegmentZKMetadata segmentMetadata =
@@ -121,22 +136,24 @@ public class SegmentCompletionManager {
// Also good for synchronization, because it is possible that multiple threads take this path, and we don't want
// multiple instances of the FSM to be created for the same commit sequence at the same time.
final long endOffset = segmentMetadata.getEndOffset();
- fsm = SegmentCompletionFSM
- .fsmInCommit(_segmentManager, this, segmentName, segmentMetadata.getNumReplicas(), endOffset);
+ fsm = SegmentCompletionFSM.fsmInCommit(_segmentManager, this, segmentName, segmentMetadata.getNumReplicas(),
+ endOffset);
} else if (msgType.equals(SegmentCompletionProtocol.MSG_TYPE_STOPPED_CONSUMING)) {
- fsm = SegmentCompletionFSM
- .fsmStoppedConsuming(_segmentManager, this, segmentName, segmentMetadata.getNumReplicas());
+ fsm = SegmentCompletionFSM.fsmStoppedConsuming(_segmentManager, this, segmentName,
+ segmentMetadata.getNumReplicas());
} else {
// Segment is in the process of completing, and this is the first one to respond. Create fsm
fsm = SegmentCompletionFSM.fsmInHolding(_segmentManager, this, segmentName, segmentMetadata.getNumReplicas());
}
LOGGER.info("Created FSM {}", fsm);
_fsmMap.put(segmentNameStr, fsm);
- } catch (Exception e) {
- // Server gone wonky. Segment does not exist in propstore
- LOGGER.error("Exception creating FSM for segment {}", segmentNameStr, e);
- throw new RuntimeException("Exception creating FSM for segment " + segmentNameStr, e);
}
+ } catch (Exception e) {
+ // Server gone wonky. Segment does not exist in propstore
+ LOGGER.error("Exception getting FSM for segment {}", segmentNameStr, e);
+ throw new RuntimeException("Exception getting FSM for segment " + segmentNameStr, e);
+ } finally {
+ lock.unlock();
}
return fsm;
}
@@ -162,7 +179,7 @@ public class SegmentCompletionManager {
fsm = lookupOrCreateFsm(segmentName, SegmentCompletionProtocol.MSG_TYPE_CONSUMED);
response = fsm.segmentConsumed(instanceId, offset, stopReason);
} catch (Exception e) {
- // Return failed response
+ LOGGER.error("Caught exception in segmentConsumed for segment {}", segmentNameStr, e);
}
if (fsm != null && fsm.isDone()) {
LOGGER.info("Removing FSM (if present):{}", fsm.toString());
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org