You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by no...@apache.org on 2016/06/02 09:18:19 UTC
[1/2] lucene-solr:master: SOLR-8744: Overseer operations performed
with fine grained mutual exclusion
Repository: lucene-solr
Updated Branches:
refs/heads/master 09372acb6 -> 9863d3d2d
SOLR-8744: Overseer operations performed with fine grained mutual exclusion
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/459a9c77
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/459a9c77
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/459a9c77
Branch: refs/heads/master
Commit: 459a9c77a6a9b807deb98c58225d4d0ec1f75bac
Parents: 34d9f0a
Author: Noble Paul <no...@apache.org>
Authored: Thu Jun 2 14:47:57 2016 +0530
Committer: Noble Paul <no...@apache.org>
Committed: Thu Jun 2 14:47:57 2016 +0530
----------------------------------------------------------------------
solr/CHANGES.txt | 2 +
.../java/org/apache/solr/cloud/LockTree.java | 182 +++++++++++++++++++
.../OverseerCollectionConfigSetProcessor.java | 17 +-
.../cloud/OverseerCollectionMessageHandler.java | 55 +++++-
.../cloud/OverseerConfigSetMessageHandler.java | 35 ++--
.../solr/cloud/OverseerMessageHandler.java | 33 +---
.../solr/cloud/OverseerTaskProcessor.java | 88 ++++-----
.../apache/solr/cloud/MultiThreadedOCPTest.java | 35 ++--
.../org/apache/solr/cloud/TestLockTree.java | 130 +++++++++++++
.../solr/common/params/CollectionParams.java | 127 ++++++++-----
.../org/apache/solr/common/util/StrUtils.java | 3 +-
11 files changed, 536 insertions(+), 171 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/459a9c77/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index d395dfc..69a7951 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -275,6 +275,8 @@ Optimizations
* SOLR-9147: Upgrade commons-io to 2.5, avoid expensive array resizing in EmbeddedSolrServer (Mikhail Khludnev)
+* SOLR-8744: Overseer operations performed with fine grained mutual exclusion (noble, Scott Blum)
+
Other Changes
----------------------
* SOLR-7516: Improve javadocs for JavaBinCodec, ObjectResolver and enforce the single-usage policy.
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/459a9c77/solr/core/src/java/org/apache/solr/cloud/LockTree.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/LockTree.java b/solr/core/src/java/org/apache/solr/cloud/LockTree.java
new file mode 100644
index 0000000..d629d1c
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/LockTree.java
@@ -0,0 +1,182 @@
+package org.apache.solr.cloud;
+
+import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.solr.cloud.OverseerMessageHandler.Lock;
+import org.apache.solr.common.params.CollectionParams;
+import org.apache.solr.common.params.CollectionParams.LockLevel;
+import org.apache.solr.common.util.StrUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This is a utility class that offers fine grained locking for various Collection Operations
+ * This class is designed for single threaded operation. It's safe for multiple threads to use it
+ * but internally it is synchronized so that only one thread can perform any operation.
+ */
+public class LockTree {
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private final Node root = new Node(null, LockLevel.CLUSTER, null);
+
+ public void clear() {
+ synchronized (this) {
+ root.clear();
+ }
+ }
+
+ private class LockImpl implements Lock {
+ final Node node;
+
+ LockImpl( Node node) {
+ this.node = node;
+ }
+
+ @Override
+ public void unlock() {
+ synchronized (LockTree.this) {
+ node.unlock(this);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return StrUtils.join(node.constructPath(new LinkedList<>()), '/');
+ }
+ }
+
+
+ public class Session {
+ private SessionNode root = new SessionNode(LockLevel.CLUSTER);
+
+ public Lock lock(CollectionParams.CollectionAction action, List<String> path) {
+ synchronized (LockTree.this) {
+ if (action.lockLevel == LockLevel.NONE) return FREELOCK;
+ if (root.isBusy(action.lockLevel, path)) return null;
+ Lock lockObject = LockTree.this.root.lock(action.lockLevel, path);
+ if (lockObject == null) root.markBusy(path, 0);
+ return lockObject;
+ }
+ }
+ }
+
+ private static class SessionNode {
+ final LockLevel level;
+ Map<String, SessionNode> kids;
+ boolean busy = false;
+
+ SessionNode(LockLevel level) {
+ this.level = level;
+ }
+
+ void markBusy(List<String> path, int depth) {
+ if (path.size() == depth) {
+ busy = true;
+ } else {
+ String s = path.get(depth);
+ if (kids == null) kids = new HashMap<>();
+ SessionNode node = kids.get(s);
+ if (node == null) kids.put(s, node = new SessionNode(level.getChild()));
+ node.markBusy(path, depth + 1);
+ }
+ }
+
+ boolean isBusy(LockLevel lockLevel, List<String> path) {
+ if (lockLevel.isHigherOrEqual(level)) {
+ if (busy) return true;
+ String s = path.get(level.level);
+ if (kids == null || kids.get(s) == null) return false;
+ return kids.get(s).isBusy(lockLevel, path);
+ } else {
+ return false;
+ }
+ }
+ }
+
+ public Session getSession() {
+ return new Session();
+ }
+
+ private class Node {
+ final String name;
+ final Node mom;
+ final LockLevel level;
+ HashMap<String, Node> children = new HashMap<>();
+ LockImpl myLock;
+
+ Node(String name, LockLevel level, Node mom) {
+ this.name = name;
+ this.level = level;
+ this.mom = mom;
+ }
+
+ //if this or any of its children are locked
+ boolean isLocked() {
+ if (myLock != null) return true;
+ for (Node node : children.values()) if (node.isLocked()) return true;
+ return false;
+ }
+
+
+ void unlock(LockImpl lockObject) {
+ if (myLock == lockObject) myLock = null;
+ else {
+ LOG.info("Unlocked multiple times : {}", lockObject.toString());
+ }
+ }
+
+
+ Lock lock(LockLevel lockLevel, List<String> path) {
+ if (myLock != null) return null;//I'm already locked. no need to go any further
+ if (lockLevel == level) {
+ //lock is supposed to be acquired at this level
+ //If I am locked or any of my children or grandchildren are locked
+ // it is not possible to acquire a lock
+ if (isLocked()) return null;
+ return myLock = new LockImpl(this);
+ } else {
+ String childName = path.get(level.level);
+ Node child = children.get(childName);
+ if (child == null)
+ children.put(childName, child = new Node(childName, LockLevel.getLevel(level.level + 1), this));
+ return child.lock(lockLevel, path);
+ }
+ }
+
+ LinkedList<String> constructPath(LinkedList<String> collect) {
+ if (name != null) collect.addFirst(name);
+ if (mom != null) mom.constructPath(collect);
+ return collect;
+ }
+
+ void clear() {
+ if (myLock != null) {
+ LOG.warn("lock_is_leaked at" + constructPath(new LinkedList<>()));
+ myLock = null;
+ }
+ for (Node node : children.values()) node.clear();
+ }
+ }
+ static final Lock FREELOCK = () -> {};
+
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/459a9c77/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionConfigSetProcessor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionConfigSetProcessor.java b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionConfigSetProcessor.java
index f1d0ab2..f8f8446 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionConfigSetProcessor.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionConfigSetProcessor.java
@@ -16,10 +16,10 @@
*/
package org.apache.solr.cloud;
-import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.handler.component.ShardHandler;
import org.apache.solr.handler.component.ShardHandlerFactory;
+
import static org.apache.solr.cloud.OverseerConfigSetMessageHandler.CONFIGSETS_ACTION_PREFIX;
/**
@@ -61,8 +61,6 @@ public class OverseerCollectionConfigSetProcessor extends OverseerTaskProcessor
super(
zkStateReader,
myId,
- shardHandlerFactory,
- adminPath,
stats,
getOverseerMessageHandlerSelector(zkStateReader, myId, shardHandlerFactory,
adminPath, stats, overseer, overseerNodePrioritizer),
@@ -85,15 +83,12 @@ public class OverseerCollectionConfigSetProcessor extends OverseerTaskProcessor
zkStateReader, myId, shardHandlerFactory, adminPath, stats, overseer, overseerNodePrioritizer);
final OverseerConfigSetMessageHandler configMessageHandler = new OverseerConfigSetMessageHandler(
zkStateReader);
- return new OverseerMessageHandlerSelector() {
- @Override
- public OverseerMessageHandler selectOverseerMessageHandler(ZkNodeProps message) {
- String operation = message.getStr(Overseer.QUEUE_OPERATION);
- if (operation != null && operation.startsWith(CONFIGSETS_ACTION_PREFIX)) {
- return configMessageHandler;
- }
- return collMessageHandler;
+ return message -> {
+ String operation = message.getStr(Overseer.QUEUE_OPERATION);
+ if (operation != null && operation.startsWith(CONFIGSETS_ACTION_PREFIX)) {
+ return configMessageHandler;
}
+ return collMessageHandler;
};
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/459a9c77/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
index ed23e77..54c0697 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java
@@ -179,7 +179,8 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
// Set that tracks collections that are currently being processed by a running task.
// This is used for handling mutual exclusion of the tasks.
- final private Set collectionWip;
+
+ final private LockTree lockTree = new LockTree();
static final Random RANDOM;
static {
@@ -206,7 +207,6 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
this.stats = stats;
this.overseer = overseer;
this.overseerPrioritizer = overseerPrioritizer;
- this.collectionWip = new HashSet();
}
@Override
@@ -216,10 +216,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
NamedList results = new NamedList();
try {
- CollectionParams.CollectionAction action = CollectionParams.CollectionAction.get(operation);
- if (action == null) {
- throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown operation:" + operation);
- }
+ CollectionParams.CollectionAction action = getCollectionAction(operation);
switch (action) {
case CREATE:
createCollection(zkStateReader.getClusterState(), message, results);
@@ -287,6 +284,13 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
case RESTORE:
processRestoreAction(message, results);
break;
+ case MOCK_COLL_TASK:
+ case MOCK_SHARD_TASK:
+ case MOCK_REPLICA_TASK: {
+ //only for test purposes
+ Thread.sleep(message.getInt("sleep", 1));
+ break;
+ }
default:
throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown operation:"
+ operation);
@@ -311,6 +315,14 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
return new OverseerSolrResponse(results);
}
+ private CollectionParams.CollectionAction getCollectionAction(String operation) {
+ CollectionParams.CollectionAction action = CollectionParams.CollectionAction.get(operation);
+ if (action == null) {
+ throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown operation:" + operation);
+ }
+ return action;
+ }
+
//
// TODO DWS: this class has gone out of control (too big); refactor to break it up
//
@@ -2663,7 +2675,8 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
message.getStr(COLLECTION_PROP) : message.getStr(NAME);
}
- @Override
+
+ /* @Override
public void markExclusiveTask(String collectionName, ZkNodeProps message) {
if (collectionName != null) {
synchronized (collectionWip) {
@@ -2679,8 +2692,8 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
collectionWip.remove(collectionName);
}
}
- }
-
+ }*/
+/*
@Override
public ExclusiveMarking checkExclusiveMarking(String collectionName, ZkNodeProps message) {
synchronized (collectionWip) {
@@ -2689,5 +2702,29 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
}
return ExclusiveMarking.NOTDETERMINED;
+ }*/
+
+ private long sessionId = -1;
+ private LockTree.Session lockSession;
+
+ @Override
+ public Lock lockTask(ZkNodeProps message, OverseerTaskProcessor.TaskBatch taskBatch) {
+ if (lockSession == null || sessionId != taskBatch.getId()) {
+ //this is always called in the same thread.
+ //Each batch is supposed to have a new taskBatch
+ //So if taskBatch changes we must create a new Session
+ // also check if the running tasks are empty. If yes, clear lockTree
+ // this will ensure that locks are not 'leaked'
+ if(taskBatch.getRunningTasks() == 0) lockTree.clear();
+ lockSession = lockTree.getSession();
+ }
+ return lockSession.lock(getCollectionAction(message.getStr(Overseer.QUEUE_OPERATION)),
+ Arrays.asList(
+ getTaskKey(message),
+ message.getStr(ZkStateReader.SHARD_ID_PROP),
+ message.getStr(ZkStateReader.REPLICA_PROP))
+
+ );
}
+
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/459a9c77/solr/core/src/java/org/apache/solr/cloud/OverseerConfigSetMessageHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerConfigSetMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/OverseerConfigSetMessageHandler.java
index ba8f129..2f2859f 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerConfigSetMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerConfigSetMessageHandler.java
@@ -45,8 +45,6 @@ import org.noggit.JSONUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.solr.cloud.OverseerMessageHandler.ExclusiveMarking.NONEXCLUSIVE;
-import static org.apache.solr.cloud.OverseerMessageHandler.ExclusiveMarking.NOTDETERMINED;
import static org.apache.solr.common.params.CommonParams.NAME;
import static org.apache.solr.common.params.ConfigSetParams.ConfigSetAction.CREATE;
@@ -147,12 +145,22 @@ public class OverseerConfigSetMessageHandler implements OverseerMessageHandler {
}
@Override
+ public Lock lockTask(ZkNodeProps message, OverseerTaskProcessor.TaskBatch taskBatch) {
+ String configSetName = getTaskKey(message);
+ if (canExecute(configSetName, message)) {
+ markExclusiveTask(configSetName, message);
+ return () -> unmarkExclusiveTask(configSetName, message);
+ }
+ return null;
+ }
+
+ @Override
public String getTaskKey(ZkNodeProps message) {
return message.getStr(NAME);
}
- @Override
- public void markExclusiveTask(String configSetName, ZkNodeProps message) {
+
+ private void markExclusiveTask(String configSetName, ZkNodeProps message) {
String baseConfigSet = getBaseConfigSetIfCreate(message);
markExclusive(configSetName, baseConfigSet);
}
@@ -164,8 +172,7 @@ public class OverseerConfigSetMessageHandler implements OverseerMessageHandler {
}
}
- @Override
- public void unmarkExclusiveTask(String configSetName, String operation, ZkNodeProps message) {
+ private void unmarkExclusiveTask(String configSetName, ZkNodeProps message) {
String baseConfigSet = getBaseConfigSetIfCreate(message);
unmarkExclusiveConfigSet(configSetName, baseConfigSet);
}
@@ -177,28 +184,26 @@ public class OverseerConfigSetMessageHandler implements OverseerMessageHandler {
}
}
- @Override
- public ExclusiveMarking checkExclusiveMarking(String configSetName, ZkNodeProps message) {
- String baseConfigSet = getBaseConfigSetIfCreate(message);
- return checkExclusiveMarking(configSetName, baseConfigSet);
- }
- private ExclusiveMarking checkExclusiveMarking(String configSetName, String baseConfigSetName) {
+ private boolean canExecute(String configSetName, ZkNodeProps message) {
+ String baseConfigSetName = getBaseConfigSetIfCreate(message);
+
synchronized (configSetWriteWip) {
// need to acquire:
// 1) write lock on ConfigSet
// 2) read lock on Base ConfigSet
if (configSetWriteWip.contains(configSetName) || configSetReadWip.contains(configSetName)) {
- return NONEXCLUSIVE;
+ return false;
}
if (baseConfigSetName != null && configSetWriteWip.contains(baseConfigSetName)) {
- return NONEXCLUSIVE;
+ return false;
}
}
- return NOTDETERMINED;
+ return true;
}
+
private String getBaseConfigSetIfCreate(ZkNodeProps message) {
String operation = message.getStr(Overseer.QUEUE_OPERATION);
if (operation != null) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/459a9c77/solr/core/src/java/org/apache/solr/cloud/OverseerMessageHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/OverseerMessageHandler.java
index 2d2408f..c4027cc 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerMessageHandler.java
@@ -44,37 +44,20 @@ public interface OverseerMessageHandler {
*/
String getTimerName(String operation);
- /**
- * @param message the message being processed
- *
- * @return the taskKey for the message for handling task exclusivity
- */
- String getTaskKey(ZkNodeProps message);
+ interface Lock {
+ void unlock();
+ }
- /**
- * @param taskKey the key associated with the task, cached from getTaskKey
- * @param message the message being processed
- */
- void markExclusiveTask(String taskKey, ZkNodeProps message);
-
- /**
- * @param taskKey the key associated with the task
- * @param operation the operation being processed
- * @param message the message being processed
+ /**Try to provide an exclusive lock for this particular task
+ * return null if locking is not possible. If locking is not necessary
*/
- void unmarkExclusiveTask(String taskKey, String operation, ZkNodeProps message);
+ Lock lockTask(ZkNodeProps message, OverseerTaskProcessor.TaskBatch taskBatch);
/**
- * @param taskKey the key associated with the task
* @param message the message being processed
*
- * @return the exclusive marking
+ * @return the taskKey for the message for handling task exclusivity
*/
- ExclusiveMarking checkExclusiveMarking(String taskKey, ZkNodeProps message);
+ String getTaskKey(ZkNodeProps message);
- enum ExclusiveMarking {
- NOTDETERMINED, // not enough context, fall back to the processor (i.e. look at running tasks)
- EXCLUSIVE,
- NONEXCLUSIVE
- }
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/459a9c77/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
index 26a90cb..93a7e6f 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
@@ -81,10 +81,6 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
private String myId;
- private final ShardHandlerFactory shardHandlerFactory;
-
- private String adminPath;
-
private ZkStateReader zkStateReader;
private boolean isClosed;
@@ -102,8 +98,6 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
private OverseerNodePrioritizer prioritizer;
public OverseerTaskProcessor(ZkStateReader zkStateReader, String myId,
- final ShardHandlerFactory shardHandlerFactory,
- String adminPath,
Overseer.Stats stats,
OverseerMessageHandlerSelector selector,
OverseerNodePrioritizer prioritizer,
@@ -113,8 +107,6 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
DistributedMap failureMap) {
this.zkStateReader = zkStateReader;
this.myId = myId;
- this.shardHandlerFactory = shardHandlerFactory;
- this.adminPath = adminPath;
this.stats = stats;
this.selector = selector;
this.prioritizer = prioritizer;
@@ -206,10 +198,11 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
if (isClosed) break;
+ taskBatch.batchId++;
for (QueueEvent head : heads) {
+ if (runningZKTasks.contains(head.getId())) continue;
final ZkNodeProps message = ZkNodeProps.load(head.getBytes());
OverseerMessageHandler messageHandler = selector.selectOverseerMessageHandler(message);
- String taskKey = messageHandler.getTaskKey(message);
final String asyncId = message.getStr(ASYNC);
if (hasLeftOverItems) {
if (head.getId().equals(oldestItemInWorkQueue))
@@ -220,27 +213,29 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
continue;
}
}
-
- if (!checkExclusivity(messageHandler, message, head.getId())) {
+ String operation = message.getStr(Overseer.QUEUE_OPERATION);
+ OverseerMessageHandler.Lock lock = messageHandler.lockTask(message, taskBatch);
+ if (lock == null) {
log.debug("Exclusivity check failed for [{}]", message.toString());
continue;
}
-
try {
- markTaskAsRunning(messageHandler, head, taskKey, asyncId, message);
+ markTaskAsRunning(head, asyncId);
log.debug("Marked task [{}] as running", head.getId());
} catch (KeeperException.NodeExistsException e) {
+ lock.unlock();
// This should never happen
log.error("Tried to pick up task [{}] when it was already running!", head.getId());
+ continue;
} catch (InterruptedException e) {
+ lock.unlock();
log.error("Thread interrupted while trying to pick task for execution.", head.getId());
Thread.currentThread().interrupt();
+ continue;
}
-
log.info(messageHandler.getName() + ": Get the message id:" + head.getId() + " message:" + message.toString());
- String operation = message.getStr(Overseer.QUEUE_OPERATION);
Runner runner = new Runner(messageHandler, message,
- operation, head);
+ operation, head, lock);
tpe.execute(runner);
}
@@ -262,31 +257,6 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
}
}
- protected boolean checkExclusivity(OverseerMessageHandler messageHandler, ZkNodeProps message, String id)
- throws KeeperException, InterruptedException {
- String taskKey = messageHandler.getTaskKey(message);
-
- if (taskKey == null)
- return true;
-
- OverseerMessageHandler.ExclusiveMarking marking = messageHandler.checkExclusiveMarking(taskKey, message);
- switch (marking) {
- case NOTDETERMINED:
- break;
- case EXCLUSIVE:
- return true;
- case NONEXCLUSIVE:
- return false;
- default:
- throw new IllegalArgumentException("Undefined marking: " + marking);
- }
-
- if (runningZKTasks.contains(id))
- return false;
-
- return true;
- }
-
private void cleanUpWorkQueue() throws KeeperException, InterruptedException {
synchronized (completedTasks) {
for (String id : completedTasks.keySet()) {
@@ -390,8 +360,7 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
}
@SuppressWarnings("unchecked")
- private void markTaskAsRunning(OverseerMessageHandler messageHandler, QueueEvent head, String taskKey,
- String asyncId, ZkNodeProps message)
+ private void markTaskAsRunning(QueueEvent head, String asyncId)
throws KeeperException, InterruptedException {
synchronized (runningZKTasks) {
runningZKTasks.add(head.getId());
@@ -401,7 +370,7 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
runningTasks.add(head.getId());
}
- messageHandler.markExclusiveTask(taskKey, message);
+// messageHandler.markExclusiveTask(taskKey, message);
if (asyncId != null)
runningMap.put(asyncId, null);
@@ -413,12 +382,14 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
SolrResponse response;
QueueEvent head;
OverseerMessageHandler messageHandler;
-
- public Runner(OverseerMessageHandler messageHandler, ZkNodeProps message, String operation, QueueEvent head) {
+ private final OverseerMessageHandler.Lock lock;
+
+ public Runner(OverseerMessageHandler messageHandler, ZkNodeProps message, String operation, QueueEvent head, OverseerMessageHandler.Lock lock) {
this.message = message;
this.operation = operation;
this.head = head;
this.messageHandler = messageHandler;
+ this.lock = lock;
response = null;
}
@@ -454,7 +425,7 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
log.debug("Completed task:[{}]", head.getId());
}
- markTaskComplete(messageHandler, head.getId(), asyncId, taskKey, message);
+ markTaskComplete(head.getId(), asyncId);
log.debug("Marked task [{}] as completed.", head.getId());
printTrackingMaps();
@@ -469,6 +440,7 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
log.warn("Resetting task {} as the thread was interrupted.", head.getId());
Thread.currentThread().interrupt();
} finally {
+ lock.unlock();
if (!success) {
// Reset task from tracking data structures so that it can be retried.
resetTaskWithException(messageHandler, head.getId(), asyncId, taskKey, message);
@@ -479,7 +451,7 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
}
}
- private void markTaskComplete(OverseerMessageHandler messageHandler, String id, String asyncId, String taskKey, ZkNodeProps message)
+ private void markTaskComplete(String id, String asyncId)
throws KeeperException, InterruptedException {
synchronized (completedTasks) {
completedTasks.put(id, head);
@@ -494,9 +466,6 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
log.warn("Could not find and remove async call [" + asyncId + "] from the running map.");
}
}
-
-
- messageHandler.unmarkExclusiveTask(taskKey, operation, message);
}
private void resetTaskWithException(OverseerMessageHandler messageHandler, String id, String asyncId, String taskKey, ZkNodeProps message) {
@@ -512,7 +481,6 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
runningTasks.remove(id);
}
- messageHandler.unmarkExclusiveTask(taskKey, operation, message);
} catch (KeeperException e) {
SolrException.log(log, "", e);
} catch (InterruptedException e) {
@@ -568,4 +536,20 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
OverseerMessageHandler selectOverseerMessageHandler(ZkNodeProps message);
}
+ final private TaskBatch taskBatch = new TaskBatch();
+
+ public class TaskBatch {
+ private long batchId = 0;
+
+ public long getId() {
+ return batchId;
+ }
+
+ public int getRunningTasks() {
+ synchronized (runningTasks) {
+ return runningTasks.size();
+ }
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/459a9c77/solr/core/src/test/org/apache/solr/cloud/MultiThreadedOCPTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/MultiThreadedOCPTest.java b/solr/core/src/test/org/apache/solr/cloud/MultiThreadedOCPTest.java
index 1195583..c18b330 100644
--- a/solr/core/src/test/org/apache/solr/cloud/MultiThreadedOCPTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/MultiThreadedOCPTest.java
@@ -30,8 +30,10 @@ import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.response.CollectionAdminResponse;
import org.apache.solr.client.solrj.response.RequestStatusState;
import org.apache.solr.common.params.CollectionParams;
+import org.apache.solr.common.params.CommonAdminParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.Utils;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -99,27 +101,32 @@ public class MultiThreadedOCPTest extends AbstractFullDistribZkTestBase {
}
}
- private void testTaskExclusivity() throws IOException, SolrServerException {
+ private void testTaskExclusivity() throws Exception, SolrServerException {
+
+ DistributedQueue distributedQueue = new DistributedQueue(cloudClient.getZkStateReader().getZkClient(),
+ "/overseer/collection-queue-work", new Overseer.Stats());
try (SolrClient client = createNewSolrClient("", getBaseUrl((HttpSolrClient) clients.get(0)))) {
+
Create createCollectionRequest = new Create()
.setCollectionName("ocptest_shardsplit")
.setNumShards(4)
.setConfigName("conf1")
.setAsyncId("1000");
createCollectionRequest.process(client);
-
- SplitShard splitShardRequest = new SplitShard()
- .setCollectionName("ocptest_shardsplit")
- .setShardName(SHARD1)
- .setAsyncId("1001");
- splitShardRequest.process(client);
-
- splitShardRequest = new SplitShard()
- .setCollectionName("ocptest_shardsplit")
- .setShardName(SHARD2)
- .setAsyncId("1002");
- splitShardRequest.process(client);
-
+
+ distributedQueue.offer(Utils.toJSON(Utils.makeMap(
+ "collection", "ocptest_shardsplit",
+ Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.MOCK_COLL_TASK.toLower(),
+ CommonAdminParams.ASYNC, "1001",
+ "sleep", "100"
+ )));
+ distributedQueue.offer(Utils.toJSON(Utils.makeMap(
+ "collection", "ocptest_shardsplit",
+ Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.MOCK_COLL_TASK.toLower(),
+ CommonAdminParams.ASYNC, "1002",
+ "sleep", "100"
+ )));
+
int iterations = 0;
while(true) {
int runningTasks = 0;
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/459a9c77/solr/core/src/test/org/apache/solr/cloud/TestLockTree.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestLockTree.java b/solr/core/src/test/org/apache/solr/cloud/TestLockTree.java
new file mode 100644
index 0000000..7e4d9b7
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/TestLockTree.java
@@ -0,0 +1,130 @@
+package org.apache.solr.cloud;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.cloud.OverseerMessageHandler.Lock;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.params.CollectionParams;
+import org.apache.solr.common.params.CollectionParams.CollectionAction;
+import org.apache.solr.common.util.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICAPROP;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.DELETEREPLICA;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.MODIFYCOLLECTION;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.SPLITSHARD;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+public class TestLockTree extends SolrTestCaseJ4 {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+
+ public void testLocks() throws Exception {
+ LockTree lockTree = new LockTree();
+ Lock coll1Lock = lockTree.getSession().lock(CollectionAction.CREATE,
+ Arrays.asList("coll1"));
+ assertNotNull(coll1Lock);
+ assertNull("Should not be able to lock coll1/shard1", lockTree.getSession().lock(CollectionAction.BALANCESHARDUNIQUE,
+ Arrays.asList("coll1", "shard1")));
+
+ assertNull(lockTree.getSession().lock(ADDREPLICAPROP,
+ Arrays.asList("coll1", "shard1", "core_node2")));
+ coll1Lock.unlock();
+ Lock shard1Lock = lockTree.getSession().lock(CollectionAction.BALANCESHARDUNIQUE,
+ Arrays.asList("coll1", "shard1"));
+ assertNotNull(shard1Lock);
+ shard1Lock.unlock();
+ Lock replica1Lock = lockTree.getSession().lock(ADDREPLICAPROP,
+ Arrays.asList("coll1", "shard1", "core_node2"));
+ assertNotNull(replica1Lock);
+
+
+ List<Pair<CollectionAction, List<String>>> operations = new ArrayList<>();
+ operations.add(new Pair<>(ADDREPLICAPROP, Arrays.asList("coll1", "shard1", "core_node2")));
+ operations.add(new Pair<>(MODIFYCOLLECTION, Arrays.asList("coll1")));
+ operations.add(new Pair<>(SPLITSHARD, Arrays.asList("coll1", "shard1")));
+ operations.add(new Pair<>(SPLITSHARD, Arrays.asList("coll2", "shard2")));
+ operations.add(new Pair<>(MODIFYCOLLECTION, Arrays.asList("coll2")));
+ operations.add(new Pair<>(DELETEREPLICA, Arrays.asList("coll2", "shard1")));
+
+ List<Set<String>> orderOfExecution = Arrays.asList(
+ ImmutableSet.of("coll1/shard1/core_node2", "coll2/shard2"),
+ ImmutableSet.of("coll1", "coll2"),
+ ImmutableSet.of("coll1/shard1", "coll2/shard1"));
+ lockTree = new LockTree();
+ for (int counter = 0; counter < orderOfExecution.size(); counter++) {
+ LockTree.Session session = lockTree.getSession();
+ List<Pair<CollectionAction, List<String>>> completedOps = new CopyOnWriteArrayList<>();
+ List<Lock> locks = new CopyOnWriteArrayList<>();
+ List<Thread> threads = new ArrayList<>();
+ for (int i = 0; i < operations.size(); i++) {
+ Pair<CollectionAction, List<String>> operation = operations.get(i);
+ final Lock lock = session.lock(operation.first(), operation.second());
+ if (lock != null) {
+ Thread thread = new Thread(getRunnable(completedOps, operation, locks, lock));
+ threads.add(thread);
+ thread.start();
+ }
+ }
+
+
+ for (Thread thread : threads) thread.join();
+ if (locks.isEmpty())
+ throw new RuntimeException("Could not attain lock for anything " + operations);
+
+ Set<String> expectedOps = orderOfExecution.get(counter);
+ log.info("counter : {} , expected : {}, actual : {}", counter, expectedOps, locks);
+ assertEquals(expectedOps.size(), locks.size());
+ for (Lock lock : locks)
+ assertTrue("locks : " + locks + " expectedOps : " + expectedOps, expectedOps.contains(lock.toString()));
+ locks.clear();
+ for (Pair<CollectionAction, List<String>> completedOp : completedOps) {
+ operations.remove(completedOp);
+ }
+ }
+ }
+
+ private Runnable getRunnable(List<Pair<CollectionAction, List<String>>> completedOps, Pair<CollectionAction,
+ List<String>> operation, List<Lock> locks, Lock lock) {
+ return () -> {
+ try {
+ Thread.sleep(1);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ } finally {
+ lock.unlock();
+ completedOps.add(operation);
+ locks.add(lock);
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/459a9c77/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java b/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
index cc505f8..42cf372 100644
--- a/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
+++ b/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
@@ -18,65 +18,104 @@ package org.apache.solr.common.params;
import java.util.Locale;
-public interface CollectionParams
-{
- /** What action **/
- public final static String ACTION = "action";
- public final static String NAME = "name";
-
+public interface CollectionParams {
+ /**
+ * What action
+ **/
+ String ACTION = "action";
+ String NAME = "name";
- public enum CollectionAction {
- CREATE(true),
- DELETE(true),
- RELOAD(true),
- SYNCSHARD(true),
- CREATEALIAS(true),
- DELETEALIAS(true),
- SPLITSHARD(true),
- DELETESHARD(true),
- CREATESHARD(true),
- DELETEREPLICA(true),
- FORCELEADER(true),
- MIGRATE(true),
- ADDROLE(true),
- REMOVEROLE(true),
- CLUSTERPROP(true),
- REQUESTSTATUS(false),
- DELETESTATUS(false),
- ADDREPLICA(true),
- OVERSEERSTATUS(false),
- LIST(false),
- CLUSTERSTATUS(false),
- ADDREPLICAPROP(true),
- DELETEREPLICAPROP(true),
- BALANCESHARDUNIQUE(true),
- REBALANCELEADERS(true),
- MODIFYCOLLECTION(true),
- MIGRATESTATEFORMAT(true),
- BACKUP(true),
- RESTORE(true);
-
+ enum LockLevel {
+ CLUSTER(0),
+ COLLECTION(1),
+ SHARD(2),
+ REPLICA(3),
+ NONE(10);
+
+ public final int level;
+
+ LockLevel(int i) {
+ this.level = i;
+ }
+
+ public LockLevel getChild() {
+ return getLevel(level + 1);
+ }
+
+ public static LockLevel getLevel(int i) {
+ for (LockLevel v : values()) {
+ if (v.level == i) return v;
+ }
+ return null;
+ }
+
+ public boolean isHigherOrEqual(LockLevel that) {
+ return that.level <= level;
+ }
+ }
+
+ enum CollectionAction {
+ CREATE(true, LockLevel.COLLECTION),
+ DELETE(true, LockLevel.COLLECTION),
+ RELOAD(true, LockLevel.COLLECTION),
+ SYNCSHARD(true, LockLevel.SHARD),
+ CREATEALIAS(true, LockLevel.COLLECTION),
+ DELETEALIAS(true, LockLevel.COLLECTION),
+ SPLITSHARD(true, LockLevel.SHARD),
+ DELETESHARD(true, LockLevel.SHARD),
+ CREATESHARD(true, LockLevel.COLLECTION),
+ DELETEREPLICA(true, LockLevel.SHARD),
+ FORCELEADER(true, LockLevel.SHARD),
+ MIGRATE(true, LockLevel.SHARD),
+ ADDROLE(true, LockLevel.NONE),
+ REMOVEROLE(true, LockLevel.NONE),
+ CLUSTERPROP(true, LockLevel.NONE),
+ REQUESTSTATUS(false, LockLevel.NONE),
+ DELETESTATUS(false, LockLevel.NONE),
+ ADDREPLICA(true, LockLevel.SHARD),
+ OVERSEERSTATUS(false, LockLevel.NONE),
+ LIST(false, LockLevel.NONE),
+ CLUSTERSTATUS(false, LockLevel.NONE),
+ ADDREPLICAPROP(true, LockLevel.REPLICA),
+ DELETEREPLICAPROP(true, LockLevel.REPLICA),
+ BALANCESHARDUNIQUE(true, LockLevel.SHARD),
+ REBALANCELEADERS(true, LockLevel.COLLECTION),
+ MODIFYCOLLECTION(true, LockLevel.COLLECTION),
+ MIGRATESTATEFORMAT(true, LockLevel.CLUSTER),
+ BACKUP(true, LockLevel.COLLECTION),
+ RESTORE(true, LockLevel.COLLECTION),
+ //only for testing. it just waits for specified time
+ // these are not exposed via collection API commands
+ // but the overseer is aware of these tasks
+ MOCK_COLL_TASK(false, LockLevel.COLLECTION),
+ MOCK_SHARD_TASK(false, LockLevel.SHARD),
+ MOCK_REPLICA_TASK(false, LockLevel.REPLICA)
+ ;
public final boolean isWrite;
+ public final LockLevel lockLevel;
- CollectionAction(boolean isWrite) {
+ CollectionAction(boolean isWrite, LockLevel level) {
this.isWrite = isWrite;
+ this.lockLevel = level;
}
public static CollectionAction get(String p) {
- if( p != null ) {
+ if (p != null) {
try {
- return CollectionAction.valueOf( p.toUpperCase(Locale.ROOT) );
+ return CollectionAction.valueOf(p.toUpperCase(Locale.ROOT));
+ } catch (Exception ex) {
}
- catch( Exception ex ) {}
}
return null;
}
- public boolean isEqual(String s){
- if(s == null) return false;
+
+ public boolean isEqual(String s) {
+ if (s == null) return false;
return toString().equals(s.toUpperCase(Locale.ROOT));
}
- public String toLower(){
+
+ public String toLower() {
return toString().toLowerCase(Locale.ROOT);
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/459a9c77/solr/solrj/src/java/org/apache/solr/common/util/StrUtils.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/util/StrUtils.java b/solr/solrj/src/java/org/apache/solr/common/util/StrUtils.java
index 5fa0fae..995e142 100644
--- a/solr/solrj/src/java/org/apache/solr/common/util/StrUtils.java
+++ b/solr/solrj/src/java/org/apache/solr/common/util/StrUtils.java
@@ -149,10 +149,11 @@ public class StrUtils {
* @see #escapeTextWithSeparator
*/
public static String join(Collection<?> items, char separator) {
+ if (items == null) return "";
StringBuilder sb = new StringBuilder(items.size() << 3);
boolean first=true;
for (Object o : items) {
- String item = o.toString();
+ String item = String.valueOf(o);
if (first) {
first = false;
} else {
[2/2] lucene-solr:master: Merge remote-tracking branch 'origin/master'
Posted by no...@apache.org.
Merge remote-tracking branch 'origin/master'
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/9863d3d2
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/9863d3d2
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/9863d3d2
Branch: refs/heads/master
Commit: 9863d3d2d786bf009d0f732d5bb561d184e1ae02
Parents: 459a9c7 09372ac
Author: Noble Paul <no...@apache.org>
Authored: Thu Jun 2 14:48:10 2016 +0530
Committer: Noble Paul <no...@apache.org>
Committed: Thu Jun 2 14:48:10 2016 +0530
----------------------------------------------------------------------
dev-tools/scripts/checkJavaDocs.py | 74 ++++---
lucene/CHANGES.txt | 4 +
.../java/org/apache/lucene/geo/Polygon2D.java | 18 +-
.../lucene/index/BufferedUpdatesStream.java | 30 ++-
.../apache/lucene/index/CoalescedUpdates.java | 14 +-
.../apache/lucene/index/DocValuesUpdate.java | 2 +-
.../java/org/apache/lucene/util/ArrayUtil.java | 71 +++++++
.../index/TestNumericDocValuesUpdates.java | 112 +++++++++-
.../lucene/search/spans/TestSpanCollection.java | 11 +-
.../org/apache/lucene/util/TestArrayUtil.java | 36 +++-
.../apache/lucene/spatial3d/geom/XYZBounds.java | 2 +-
solr/CHANGES.txt | 4 +-
.../org/apache/solr/util/SolrPluginUtils.java | 14 +-
.../apache/solr/cloud/TestSSLRandomization.java | 202 +++++++++++++++++++
.../apache/solr/util/SolrPluginUtilsTest.java | 28 ---
.../java/org/apache/solr/SolrTestCaseJ4.java | 27 ++-
.../java/org/apache/solr/util/RandomizeSSL.java | 174 ++++++++++++++++
17 files changed, 700 insertions(+), 123 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9863d3d2/solr/CHANGES.txt
----------------------------------------------------------------------