You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/10/30 20:28:17 UTC
[lucene-solr] branch reference_impl_dev updated: @1084 The Overseer
hits back.
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/reference_impl_dev by this push:
new 4bcdd85 @1084 The Overseer hits back.
4bcdd85 is described below
commit 4bcdd85b9de32b7eff8a9d036849d64b1a43c258
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Fri Oct 30 15:27:35 2020 -0500
@1084 The Overseer hits back.
---
.../src/java/org/apache/solr/cloud/Overseer.java | 25 +-
.../apache/solr/cloud/OverseerTaskProcessor.java | 60 +---
.../org/apache/solr/cloud/OverseerTaskQueue.java | 9 +-
.../solr/cloud/ShardLeaderElectionContextBase.java | 4 +-
.../org/apache/solr/cloud/ZkDistributedQueue.java | 302 +++++++++++++--------
.../cloud/api/collections/CreateCollectionCmd.java | 7 +-
.../solr/cloud/overseer/ClusterStateMutator.java | 8 +-
.../apache/solr/cloud/overseer/ZkStateWriter.java | 5 +-
.../src/java/org/apache/solr/core/SolrCore.java | 6 +-
.../java/org/apache/solr/update/CommitTracker.java | 9 +-
.../apache/solr/update/DirectUpdateHandler2.java | 2 +-
.../AbstractCloudBackupRestoreTestCase.java | 5 +-
.../solr/cloud/api/collections/ShardSplitTest.java | 6 +-
.../collections/TestLocalFSCloudBackupRestore.java | 2 +-
.../src/java/org/apache/solr/common/ParWork.java | 6 +-
.../src/resources/logconf/log4j2-startup-debug.xml | 14 +-
.../src/resources/logconf/log4j2-std-debug.xml | 2 +-
17 files changed, 264 insertions(+), 208 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index 214e478..2af0f69 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -240,6 +240,8 @@ public class Overseer implements SolrCloseable {
int fallbackQueueSize = Integer.MAX_VALUE;
ZkDistributedQueue fallbackQueue = workQueue;
while (!checkClosed()) {
+ if (log.isDebugEnabled()) log.debug("Start of Overseer loop ...");
+
if (zkStateWriter == null) {
try {
zkStateWriter = new ZkStateWriter(reader, stats);
@@ -308,8 +310,8 @@ public class Overseer implements SolrCloseable {
try {
// We do not need to filter any nodes here cause all processed nodes are removed once we flush clusterstate
- long wait = 5000;
- queue = new LinkedList<>(stateUpdateQueue.peekElements(1000, wait, (x) -> true));
+ long wait = 100;
+ queue = new LinkedList<>(stateUpdateQueue.peekElements(1000, wait, (x) -> false));
} catch (AlreadyClosedException e) {
if (isClosed()) {
log.info("Overseer closed (AlreadyClosedException), exiting loop");
@@ -330,7 +332,7 @@ public class Overseer implements SolrCloseable {
}
try {
Set<String> processedNodes = new HashSet<>();
- int loopCnt = 0;
+
while (queue != null && !queue.isEmpty()) {
if (isClosed()) {
log.info("Closing");
@@ -355,12 +357,8 @@ public class Overseer implements SolrCloseable {
return;
}
// if an event comes in the next *ms batch it together
- int wait = 10;
- queue = new LinkedList<>(stateUpdateQueue.peekElements(100, wait, node -> !processedNodes.contains(node)));
- if (loopCnt >= 1) {
- break;
- }
- loopCnt++;
+ int wait = 5000;
+ queue = new LinkedList<>(stateUpdateQueue.peekElements(100, wait, node -> processedNodes.contains(node)));
}
fallbackQueueSize = processedNodes.size();
// we should force write all pending updates because the next iteration might sleep until there
@@ -485,19 +483,16 @@ public class Overseer implements SolrCloseable {
List<ZkWriteCommand> zkWriteOps = processMessage(prevState, message, operation);
- cs = zkStateWriter.enqueueUpdate(prevState, zkWriteOps, () -> {
- // log.info("on write callback");
- });
+ cs = zkStateWriter.enqueueUpdate(prevState, zkWriteOps, callback);
// }
-
+ if (log.isDebugEnabled()) log.debug("State update consumed from queue {}", message);
return cs;
}
private List<ZkWriteCommand> processMessage(ClusterState clusterState,
final ZkNodeProps message, final String operation) {
if (log.isDebugEnabled()) {
- // nocommit
- // log.debug("processMessage(ClusterState clusterState={}, ZkNodeProps message={}, String operation={}) - start", clusterState, message, operation);
+ log.debug("processMessage(ClusterState clusterState={}, ZkNodeProps message={}, String operation={}) - start", clusterState, message, operation);
}
CollectionParams.CollectionAction collectionAction = CollectionParams.CollectionAction.get(operation);
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
index b7984cd..6888db1 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
@@ -104,7 +104,10 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
@Override
public boolean test(String s) {
// nocommit
- return runningTasks.contains(s) || blockedTasks.containsKey(s) || runningZKTasks.contains(s);
+
+ boolean contains = runningTasks.contains(s) || blockedTasks.containsKey(s) || runningZKTasks.contains(s);
+ if (log.isDebugEnabled()) log.debug("test {} against {}, {}, {} : {}", s, runningTasks, blockedTasks, runningZKTasks, contains);
+ return contains;
}
@Override
@@ -174,24 +177,8 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
}
if (oldestItemInWorkQueue == null) hasLeftOverItems = false;
- else log.debug(
- "Found already existing elements in the work-queue. Last element: {}",
- oldestItemInWorkQueue);
-
- if (prioritizer != null) {
- try {
- prioritizer.prioritizeOverseerNodes(myId);
- } catch (Exception e) {
- ParWork.propagateInterrupt(e);
- if (e instanceof KeeperException.SessionExpiredException) {
- return;
- }
- if (e instanceof InterruptedException
- || e instanceof AlreadyClosedException) {
- return;
- }
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
- }
+ else {
+ if (log.isDebugEnabled()) log.debug("Found already existing elements in the work-queue. Last element: {}", oldestItemInWorkQueue);
}
while (!this.isClosed()) {
@@ -214,6 +201,7 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
if (waited) cleanUpWorkQueue();
ArrayList<QueueEvent> heads = new ArrayList<>(blockedTasks.size() + MAX_PARALLEL_TASKS);
+ if (log.isDebugEnabled()) log.debug("Add {} blocked tasks to process", blockedTasks.size());
heads.addAll(blockedTasks.values());
blockedTasks.clear(); // clear it now; may get refilled below.
//If we have enough items in the blocked tasks already, it makes
@@ -222,21 +210,20 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
if (heads.size() < MAX_BLOCKED_TASKS) {
//instead of reading MAX_PARALLEL_TASKS items always, we should only fetch as much as we can execute
int toFetch = Math.min(MAX_BLOCKED_TASKS - heads.size(), MAX_PARALLEL_TASKS - runningTasksSize());
- List<QueueEvent> newTasks = workQueue.peekTopN(toFetch, excludedTasks, 1500);
+ if (log.isDebugEnabled()) log.debug("PeekTopN for {} items", toFetch);
+ List<QueueEvent> newTasks = workQueue.peekTopN(toFetch, excludedTasks, 5000);
if (log.isDebugEnabled()) log.debug("Got {} tasks from work-queue : [{}]", newTasks.size(), newTasks);
heads.addAll(newTasks);
}
- if (isClosed) return;
-
taskBatch.batchId++;
for (QueueEvent head : heads) {
- if (runningZKTasks.contains(head.getId())) {
- log.warn("Task found in running ZKTasks already, continuing");
- continue;
- }
+// if (runningZKTasks.contains(head.getId())) {
+// log.warn("Task found in running ZKTasks already, continuing");
+// continue;
+// }
final ZkNodeProps message = ZkNodeProps.load(head.getBytes());
final String asyncId = message.getStr(ASYNC);
@@ -259,7 +246,7 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
OverseerMessageHandler.Lock lock = null;
lock = messageHandler.lockTask(message, taskBatch);
if (lock == null) {
- log.debug("Exclusivity check failed for [{}]", message.toString());
+ if (log.isDebugEnabled()) log.debug("Exclusivity check failed for [{}]", message.toString());
// we may end crossing the size of the MAX_BLOCKED_TASKS. They are fine
if (blockedTasks.size() < MAX_BLOCKED_TASKS) blockedTasks.put(head.getId(), head);
continue;
@@ -300,23 +287,10 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
}
private int runningTasksSize() {
- if (log.isDebugEnabled()) {
- log.debug("runningTasksSize() - start");
- }
-
- int returnint = runningTasks.size();
- if (log.isDebugEnabled()) {
- log.debug("runningTasksSize() - end");
- }
- return returnint;
-
+ return runningTasks.size();
}
private void cleanUpWorkQueue() throws KeeperException, InterruptedException {
- if (log.isDebugEnabled()) {
- log.debug("cleanUpWorkQueue() - start");
- }
-
Set<Map.Entry<String, QueueEvent>> entrySet = completedTasks.entrySet();
AtomicBoolean sessionExpired = new AtomicBoolean();
AtomicBoolean interrupted = new AtomicBoolean();
@@ -345,10 +319,6 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
if (sessionExpired.get()) {
throw new KeeperException.SessionExpiredException();
}
-
- if (log.isDebugEnabled()) {
- log.debug("cleanUpWorkQueue() - end");
- }
}
public void closing() {
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
index d9551d8..20a3332 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
@@ -23,7 +23,6 @@ import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Predicate;
@@ -268,15 +267,16 @@ public class OverseerTaskQueue extends ZkDistributedQueue {
public List<QueueEvent> peekTopN(int n, Predicate<String> excludeSet, long waitMillis)
throws KeeperException, InterruptedException {
+ if (log.isDebugEnabled()) log.debug("peekTopN {} {}", n, excludeSet);
ArrayList<QueueEvent> topN = new ArrayList<>();
- if (log.isDebugEnabled()) log.debug("Peeking for top {} elements. ExcludeSet: {}", n, excludeSet);
Timer.Context time;
if (waitMillis == Long.MAX_VALUE) time = stats.time(dir + "_peekTopN_wait_forever");
else time = stats.time(dir + "_peekTopN_wait" + waitMillis);
try {
- for (Pair<String, byte[]> element : peekElements(n, waitMillis, child -> !excludeSet.test(dir + "/" + child))) {
+ for (Pair<String, byte[]> element : peekElements(n, waitMillis, excludeSet)) {
+ if (log.isDebugEnabled()) log.debug("Add to topN {}", dir + "/" + element.first());
topN.add(new QueueEvent(dir + "/" + element.first(),
element.second(), null));
}
@@ -305,7 +305,7 @@ public class OverseerTaskQueue extends ZkDistributedQueue {
*/
public String getTailId() throws KeeperException, InterruptedException {
// TODO: could we use getChildren here? Unsure what freshness guarantee the caller needs.
- TreeSet<String> orderedChildren = fetchZkChildren(null);
+ TreeSet<String> orderedChildren = fetchZkChildren(null, null);
for (String headNode : orderedChildren.descendingSet())
if (headNode != null) {
@@ -346,6 +346,7 @@ public class OverseerTaskQueue extends ZkDistributedQueue {
private volatile byte[] bytes;
QueueEvent(String id, byte[] bytes, WatchedEvent event) {
+ if (log.isDebugEnabled()) log.debug("Create QueueEvent with id {}", id);
this.id = id;
this.bytes = bytes;
this.event = event;
diff --git a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java
index 9e0cda1..2e69314 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java
@@ -125,7 +125,9 @@ class ShardLeaderElectionContextBase extends ElectionContext {
}
} else {
try {
- zkClient.delete(leaderSeqPath, -1);
+ if (leaderSeqPath != null) {
+ zkClient.delete(leaderSeqPath, -1);
+ }
} catch (NoNodeException e) {
// fine
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java b/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java
index c8da9c4..8351b96 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java
@@ -24,17 +24,16 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
+import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Predicate;
import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
import org.apache.solr.client.solrj.cloud.DistributedQueue;
import org.apache.solr.common.AlreadyClosedException;
import org.apache.solr.common.ParWork;
@@ -128,26 +127,14 @@ public class ZkDistributedQueue implements DistributedQueue {
public ZkDistributedQueue(SolrZkClient zookeeper, String dir, Stats stats, int maxQueueSize, IsClosed higherLevelIsClosed) {
this.dir = dir;
- // ZkCmdExecutor cmdExecutor = new ZkCmdExecutor(zookeeper.getZkClientTimeout(), higherLevelIsClosed);
- // try {
- // cmdExecutor.ensureExists(dir, zookeeper);
- // } catch (KeeperException e) {
- // throw new SolrException(ErrorCode.SERVER_ERROR, e);
- // } catch (InterruptedException e) {
- // Thread.currentThread().interrupt();
- // throw new SolrException(ErrorCode.SERVER_ERROR, e);
- // }
-
this.zookeeper = zookeeper;
this.stats = stats;
this.maxQueueSize = maxQueueSize;
- Watcher watcher = new ChildWatcher();
-
try {
try {
updateLock.lockInterruptibly();
- knownChildren = fetchZkChildren(watcher);
+ knownChildren = fetchZkChildren(null, null);
} catch (KeeperException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
} catch (InterruptedException e) {
@@ -198,33 +185,39 @@ public class ZkDistributedQueue implements DistributedQueue {
*/
@Override
public byte[] peek(long wait) throws KeeperException, InterruptedException {
- Preconditions.checkArgument(wait > 0);
- Timer.Context time;
- if (wait == Long.MAX_VALUE) {
- time = stats.time(dir + "_peek_wait_forever");
- } else {
- time = stats.time(dir + "_peek_wait" + wait);
- }
- updateLock.lockInterruptibly();
+ byte[] result = null;
try {
+ Timer.Context time;
+ if (wait == Long.MAX_VALUE) {
+ time = stats.time(dir + "_peek_wait_forever");
+ } else {
+ time = stats.time(dir + "_peek_wait" + wait);
+ }
+
long waitNanos = TimeUnit.MILLISECONDS.toNanos(wait);
- while (waitNanos > 0) {
- byte[] result = firstElement();
- if (result != null) {
- return result;
- }
- TreeSet<String> existingChildren = knownChildren;
- while (existingChildren == knownChildren) {
- changed.await(500, TimeUnit.MILLISECONDS);
- }
+ result = firstElement();
+ if (result != null) {
+ return result;
}
- return null;
+
+ ChildWatcher watcher = new ChildWatcher();
+ TreeSet<String> foundChildren = fetchZkChildren(watcher, null);
+
+ TimeOut timeout = new TimeOut(waitNanos, TimeUnit.NANOSECONDS, TimeSource.NANO_TIME);
+
+ waitForChildren(null, foundChildren, waitNanos, timeout, watcher);
+ if (foundChildren.size() == 0) {
+ return null;
+ }
+ result = firstElement();
+ return result;
} finally {
- if (updateLock.isHeldByCurrentThread()) {
- updateLock.unlock();
+ try {
+ zookeeper.getSolrZooKeeper().removeAllWatches(dir, Watcher.WatcherType.Children, false);
+ } catch (Exception e) {
+ log.info(e.getMessage());
}
- time.stop();
}
}
@@ -264,6 +257,7 @@ public class ZkDistributedQueue implements DistributedQueue {
}
public void remove(Collection<String> paths) throws KeeperException, InterruptedException {
+ if (log.isDebugEnabled()) log.debug("Remove paths from queue {} {}", dir, paths);
if (paths.isEmpty()) return;
List<Op> ops = new ArrayList<>();
for (String path : paths) {
@@ -289,6 +283,16 @@ public class ZkDistributedQueue implements DistributedQueue {
}
}
}
+ updateLock.lockInterruptibly();
+ try {
+ for (String path : paths) {
+ knownChildren.remove(dir + "/" + path);
+ }
+ } finally {
+ if (updateLock.isHeldByCurrentThread()) {
+ updateLock.unlock();
+ }
+ }
}
/**
@@ -302,16 +306,26 @@ public class ZkDistributedQueue implements DistributedQueue {
Timer.Context timer = stats.time(dir + "_take");
updateLock.lockInterruptibly();
try {
- while (true) {
- byte[] result = removeFirst();
- if (result != null) {
- return result;
- }
- TreeSet<String> existingChildren = knownChildren;
+ long waitNanos = TimeUnit.MILLISECONDS.toNanos(60000);
- while (existingChildren == knownChildren) {
- changed.await(500, TimeUnit.MILLISECONDS);
- }
+ byte[] result = removeFirst();
+ if (result != null) {
+ return result;
+ }
+
+ ChildWatcher watcher = new ChildWatcher();
+ TreeSet<String> foundChildren = fetchZkChildren(watcher, null);
+
+ TimeOut timeout = new TimeOut(waitNanos, TimeUnit.NANOSECONDS, TimeSource.NANO_TIME);
+
+ waitForChildren(null, foundChildren, waitNanos, timeout, watcher);
+ if (foundChildren.size() == 0) {
+ return null;
+ }
+
+ result = removeFirst();
+ if (result != null) {
+ return result;
}
} finally {
if (updateLock.isHeldByCurrentThread()) {
@@ -319,6 +333,7 @@ public class ZkDistributedQueue implements DistributedQueue {
}
timer.stop();
}
+ return null;
}
/**
@@ -427,18 +442,25 @@ public class ZkDistributedQueue implements DistributedQueue {
/**
* Return the current set of children from ZK; does not change internal state.
*/
- TreeSet<String> fetchZkChildren(Watcher watcher) throws KeeperException {
+ TreeSet<String> fetchZkChildren(Watcher watcher, Predicate<String> acceptFilter) throws KeeperException {
TreeSet<String> orderedChildren = new TreeSet<>();
try {
List<String> childNames = zookeeper.getChildren(dir, watcher, true);
stats.setQueueLength(childNames.size());
for (String childName : childNames) {
+ if (log.isDebugEnabled()) log.debug("Examine child: {} out of {} {}", childName, childNames.size(), acceptFilter);
// Check format
if (!childName.regionMatches(0, PREFIX, 0, PREFIX.length())) {
+
// responses can be written to same queue with different naming scheme
if (log.isDebugEnabled()) log.debug("Found child node with improper name: {}", childName);
continue;
}
+ if (acceptFilter != null && acceptFilter.test(dir + "/" + childName)) {
+ if (log.isDebugEnabled()) log.debug("Found child that matched exclude filter: {}", dir + "/" + childName);
+ continue;
+ }
+ if (log.isDebugEnabled()) log.debug("Add child to fetched children: {}", childName);
orderedChildren.add(childName);
}
return orderedChildren;
@@ -456,69 +478,40 @@ public class ZkDistributedQueue implements DistributedQueue {
*/
@Override
public Collection<Pair<String, byte[]>> peekElements(int max, long waitMillis, Predicate<String> acceptFilter) throws KeeperException {
- List<String> foundChildren = new ArrayList<>();
+ if (log.isDebugEnabled()) log.debug("peekElements {} {}", max, acceptFilter);
+ List<Pair<String,byte[]>> result = null;
+ ChildWatcher watcher = new ChildWatcher();
+ TreeSet<String> foundChildren = fetchZkChildren(watcher, acceptFilter);
long waitNanos = TimeUnit.MILLISECONDS.toNanos(waitMillis);
TimeOut timeout = new TimeOut(waitNanos, TimeUnit.NANOSECONDS, TimeSource.NANO_TIME);
try {
- while (true) {
- // Trigger a refresh, but only force it if this is not the first iteration.
- //firstChild(false, !first);
-
- updateLock.lockInterruptibly();
- try {
- for (String child : knownChildren) {
- if (acceptFilter.test(child)) {
- foundChildren.add(child);
- }
- }
- if (!foundChildren.isEmpty()) {
- break;
- }
- if (waitNanos <= 0) {
- break;
- }
-
- while (foundChildren.size() == 0) {
- try {
- changed.await(250, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- ParWork.propagateInterrupt(e);
- throw new AlreadyClosedException();
- }
- if (timeout.hasTimedOut() || zookeeper.isClosed() || !zookeeper.isConnected()) {
- return Collections.emptyList();
- }
-
- for (String child : knownChildren) {
- if (acceptFilter.test(child)) {
- foundChildren.add(child);
- }
- }
-
- }
- } finally {
- if (updateLock.isHeldByCurrentThread()) {
- updateLock.unlock();
- }
- }
-
- if (!foundChildren.isEmpty()) {
- break;
- }
+ if (foundChildren.size() == 0) {
+ waitForChildren(acceptFilter, foundChildren, waitNanos, timeout, watcher);
}
- // Technically we could restart the method if we fail to actually obtain any valid children
+ // Technically we could restart the method if we fasil to actually obtain any valid children
// from ZK, but this is a super rare case, and the latency of the ZK fetches would require
// much more sophisticated waitNanos tracking.
- List<Pair<String,byte[]>> result = new ArrayList<>(foundChildren.size());
+ result = new ArrayList<>(foundChildren.size());
for (String child : foundChildren) {
if (result.size() >= max) {
break;
}
+
try {
byte[] data = zookeeper.getData(dir + "/" + child, null, null, true);
+ if (log.isDebugEnabled()) log.debug("get data for child={}", child);
result.add(new Pair<>(child, data));
} catch (KeeperException.NoNodeException e) {
+ if (log.isDebugEnabled()) log.debug("no node found for child={}", child);
+ updateLock.lockInterruptibly();
+ try {
+ knownChildren.remove(child);
+ } finally {
+ if (updateLock.isHeldByCurrentThread()) {
+ updateLock.unlock();
+ }
+ }
continue;
}
}
@@ -526,6 +519,72 @@ public class ZkDistributedQueue implements DistributedQueue {
} catch (InterruptedException e) {
ParWork.propagateInterrupt(e);
throw new AlreadyClosedException(e);
+ } finally {
+ try {
+ zookeeper.getSolrZooKeeper().removeAllWatches(dir, Watcher.WatcherType.Children, false);
+ } catch (Exception e) {
+ log.info(e.getMessage());
+ }
+ }
+ }
+
+ private void waitForChildren(Predicate<String> acceptFilter, TreeSet<String> foundChildren, long waitNanos, TimeOut timeout, ChildWatcher watcher) throws InterruptedException, KeeperException {
+ if (log.isDebugEnabled()) log.debug("wait for children ... {}", waitNanos);
+ updateLock.lockInterruptibly();
+ try {
+ for (String child : knownChildren) {
+ if (acceptFilter == null || !acceptFilter.test(dir + "/" + child)) {
+ foundChildren.add(child);
+ }
+ }
+ } finally {
+ if (updateLock.isHeldByCurrentThread()) {
+ updateLock.unlock();
+ }
+ }
+ if (!foundChildren.isEmpty()) {
+ return;
+ }
+ if (waitNanos <= 0) {
+ if (log.isDebugEnabled()) log.debug("0 wait time and no children found, return");
+ return;
+ }
+
+ while (foundChildren.size() == 0) {
+ if (watcher.fired) {
+ watcher.fired = false;
+ foundChildren = fetchZkChildren(watcher, acceptFilter);
+ if (!foundChildren.isEmpty()) {
+ break;
+ }
+ }
+ updateLock.lockInterruptibly();
+ try {
+ try {
+ changed.await(250, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ ParWork.propagateInterrupt(e);
+ }
+ if (zookeeper.isClosed() || !zookeeper.isConnected()) {
+ throw new AlreadyClosedException();
+ }
+ if (timeout.hasTimedOut()) {
+ return;
+ }
+ for (String child : knownChildren) {
+ if (acceptFilter == null || !acceptFilter.test(dir + "/" + child)) {
+ foundChildren.add(child);
+ }
+ }
+ } finally {
+ if (updateLock.isHeldByCurrentThread()) {
+ updateLock.unlock();
+ }
+ }
+ if (!foundChildren.isEmpty()) {
+ break;
+ }
+
}
}
@@ -536,7 +595,7 @@ public class ZkDistributedQueue implements DistributedQueue {
*/
private byte[] firstElement() throws KeeperException {
try {
- while (true) {
+
String firstChild = null;
firstChild = firstChild(false);
if (firstChild == null) {
@@ -547,7 +606,7 @@ public class ZkDistributedQueue implements DistributedQueue {
} catch (KeeperException.NoNodeException e) {
return null;
}
- }
+
} catch (InterruptedException e) {
ParWork.propagateInterrupt(e);
throw new AlreadyClosedException(e);
@@ -556,20 +615,25 @@ public class ZkDistributedQueue implements DistributedQueue {
private byte[] removeFirst() throws KeeperException {
try {
- while (true) {
- String firstChild = firstChild(true);
- if (firstChild == null) {
- return null;
- }
+ String firstChild = firstChild(true);
+ if (firstChild == null) {
+ return null;
+ }
+ try {
+ String path = dir + "/" + firstChild;
+ byte[] result = zookeeper.getData(path, null, null, true);
+ zookeeper.delete(path, -1, true);
+ updateLock.lockInterruptibly();
try {
- String path = dir + "/" + firstChild;
- byte[] result = zookeeper.getData(path, null, null, true);
- zookeeper.delete(path, -1, true);
- // stats.setQueueLength(knownChildren.size());
- return result;
- } catch (KeeperException.NoNodeException e) {
- return null;
+ knownChildren.remove(path);
+ } finally {
+ if (updateLock.isHeldByCurrentThread()) {
+ updateLock.unlock();
+ }
}
+ return result;
+ } catch (KeeperException.NoNodeException e) {
+ return null;
}
} catch (InterruptedException e) {
ParWork.propagateInterrupt(e);
@@ -582,6 +646,7 @@ public class ZkDistributedQueue implements DistributedQueue {
}
@VisibleForTesting class ChildWatcher implements Watcher {
+ volatile boolean fired = false;
@Override
public void process(WatchedEvent event) {
@@ -592,16 +657,17 @@ public class ZkDistributedQueue implements DistributedQueue {
if (log.isDebugEnabled()) log.debug("DistributedQueue changed {} {}", event.getPath(), event.getType());
// nocommit - all the nodes are watching this currently instead of just the Overseer
-
- updateLock.lock();
- try {
- knownChildren = fetchZkChildren(this);
-
- changed.signalAll();
- } catch (KeeperException e) {
- log.error("", e);
- } finally {
- updateLock.unlock();
+ if (event.getType() == Event.EventType.NodeChildrenChanged) {
+ updateLock.lock();
+ try {
+ knownChildren = fetchZkChildren(null, null);
+ fired = true;
+ changed.signalAll();
+ } catch (KeeperException e) {
+ log.error("", e);
+ } finally {
+ updateLock.unlock();
+ }
}
}
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
index e94229a..bb8ee8a 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
@@ -314,7 +314,8 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
ShardRequest sreq = e.getValue();
Replica replica = null;
for (Replica rep : replicas.values()) {
- if (rep.getCoreName().equals(sreq.params.get(CoreAdminParams.NAME)) && rep.getBaseUrl().equals(sreq.shards[0])) {
+ log.info("cmp {} {} {} {}", e.getKey(), sreq.shards[0], rep.getCoreName(), rep.getBaseUrl());
+ if (rep.getCoreName().equals(e.getKey()) && rep.getBaseUrl().equals(sreq.shards[0])) {
sreq.params.set(CoreAdminParams.CORE_NODE_NAME, rep.getName());
replica = rep;
break;
@@ -322,8 +323,7 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
}
if (sreq.params.get(CoreAdminParams.CORE_NODE_NAME) == null || replica == null) {
- throw new IllegalStateException(
- "No core node name found for " + e.getKey() + " replica=" + replica + " positions:" + replicaPositions.size() + " cores:" + coresToCreate.size() + " replicas:" + replicas.size());
+ continue;
}
log.info("Submit request to shard for for replica={}", sreq.actualShards != null ? Arrays.asList(sreq.actualShards) : "null");
@@ -794,6 +794,7 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
}
}
if (replicas == expectedReplicas) {
+ log.info("Found expected replicas={} {}", expectedReplicas, replicaMap);
return true;
}
return false;
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ClusterStateMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ClusterStateMutator.java
index 993d394..47e8244 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ClusterStateMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ClusterStateMutator.java
@@ -58,10 +58,10 @@ public class ClusterStateMutator {
public ZkWriteCommand createCollection(ClusterState clusterState, ZkNodeProps message) {
String cName = message.getStr(NAME);
if (log.isDebugEnabled()) log.debug("building a new cName: " + cName);
- if (clusterState.hasCollection(cName)) {
- log.warn("Collection {} already exists. exit", cName);
- return ZkStateWriter.NO_OP;
- }
+// if (clusterState.hasCollection(cName)) {
+// log.warn("Collection {} already exists. exit", cName);
+// return ZkStateWriter.NO_OP;
+// }
Map<String, Object> routerSpec = DocRouter.getRouterSpec(message);
String routerName = routerSpec.get(NAME) == null ? DocRouter.DEFAULT_NAME : (String) routerSpec.get(NAME);
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
index 1233de6..530d335 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
@@ -100,7 +100,7 @@ public class ZkStateWriter {
public ClusterState enqueueUpdate(ClusterState state, List<ZkWriteCommand> cmds, ZkWriteCallback callback) throws IllegalStateException, Exception {
if (log.isDebugEnabled()) {
// nocommit trace?
- //log.debug("enqueueUpdate(ClusterState prevState={}, List<ZkWriteCommand> cmds={}, updates={}, ZkWriteCallback callback={}) - start", state, cmds, updatesToWrite, callback);
+ log.debug("enqueueUpdate(ClusterState prevState={}, List<ZkWriteCommand> cmds={}, updates={}, ZkWriteCallback callback={}) - start", state, cmds, updatesToWrite, callback);
}
Map<String,DocCollection> updateCmds = new LinkedHashMap<>(cmds.size());
@@ -189,6 +189,9 @@ public class ZkStateWriter {
}
}
+ if (callback != null) {
+ callback.onWrite();
+ }
if (log.isDebugEnabled()) {
log.debug("enqueueUpdate(ClusterState, List<ZkWriteCommand>, ZkWriteCallback) - end");
diff --git a/solr/core/src/java/org/apache/solr/core/SolrCore.java b/solr/core/src/java/org/apache/solr/core/SolrCore.java
index b495106..9c5b9f3 100644
--- a/solr/core/src/java/org/apache/solr/core/SolrCore.java
+++ b/solr/core/src/java/org/apache/solr/core/SolrCore.java
@@ -2451,7 +2451,7 @@ public final class SolrCore implements SolrInfoBean, Closeable {
if (currSearcher == null) {
future = searcherExecutor.submit(() -> {
- try (ParWork work = new ParWork(this, false)) {
+ try (ParWork work = new ParWork(this, true, true)) {
for (SolrEventListener listener : firstSearcherListeners) {
work.collect("fistSearcherListeners", () -> {
listener.newSearcher(newSearcher, null);
@@ -2464,7 +2464,7 @@ public final class SolrCore implements SolrInfoBean, Closeable {
if (currSearcher != null) {
future = searcherExecutor.submit(() -> {
- try (ParWork work = new ParWork(this, false)) {
+ try (ParWork work = new ParWork(this, true, true)) {
for (SolrEventListener listener : newSearcherListeners) {
work.collect("newSearcherListeners", () -> {
listener.newSearcher(newSearcher, null);
@@ -2845,7 +2845,7 @@ public final class SolrCore implements SolrInfoBean, Closeable {
m.put("xlsx",
(QueryResponseWriter) Class.forName("org.apache.solr.handler.extraction.XLSXResponseWriter").getConstructor().newInstance());
} catch (Exception e) {
- ParWork.propagateInterrupt(e, true);
+ ParWork.propagateInterrupt("XLSXResponseWriter from extraction contrib not found on classpath", null, true);
//don't worry; solrcell contrib not in class path
}
}
diff --git a/solr/core/src/java/org/apache/solr/update/CommitTracker.java b/solr/core/src/java/org/apache/solr/update/CommitTracker.java
index 600c3e9..bd2412b 100644
--- a/solr/core/src/java/org/apache/solr/update/CommitTracker.java
+++ b/solr/core/src/java/org/apache/solr/update/CommitTracker.java
@@ -111,7 +111,12 @@ public final class CommitTracker implements Runnable, Closeable {
public void close() {
- lock.lock();
+ try {
+ lock.lockInterruptibly();
+ } catch (InterruptedException e) {
+ ParWork.propagateInterrupt(e);
+ return;
+ }
try {
this.closed = true;
try {
@@ -122,7 +127,7 @@ public final class CommitTracker implements Runnable, Closeable {
pending = null;
ParWork.close(scheduler);
} finally {
- lock.unlock();
+ if (lock.isHeldByCurrentThread()) lock.unlock();
}
assert ObjectReleaseTracker.release(this);
}
diff --git a/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java b/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
index 74f49df..fb14c57 100644
--- a/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
+++ b/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
@@ -806,7 +806,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
@Override
public void close() throws IOException {
- log.debug("closing {}", this);
+ if (log.isDebugEnabled()) log.debug("closing {}", this);
try (ParWork closer = new ParWork(this, true, true)) {
closer.collect(commitTracker);
closer.collect(softCommitTracker);
diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/AbstractCloudBackupRestoreTestCase.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/AbstractCloudBackupRestoreTestCase.java
index e1fa598..ae3da25 100644
--- a/solr/core/src/test/org/apache/solr/cloud/api/collections/AbstractCloudBackupRestoreTestCase.java
+++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/AbstractCloudBackupRestoreTestCase.java
@@ -381,6 +381,9 @@ public abstract class AbstractCloudBackupRestoreTestCase extends SolrCloudTestCa
assertEquals(RequestStatusState.COMPLETED, restore.processAndWait(client, 60));//async
}
+ // TODO: the restore call should do better at waiting here for youz
+ cluster.waitForActiveCollection(collectionName, NUM_SHARDS, NUM_SHARDS * (replFactor + numTlogReplicas + numPullReplicas));
+
//Check the number of results are the same
DocCollection restoreCollection = client.getZkStateReader().getClusterState().getCollection(restoreCollectionName);
assertEquals(origShardToDocCount, getShardToDocCountMap(client, restoreCollection));
@@ -439,7 +442,7 @@ public abstract class AbstractCloudBackupRestoreTestCase extends SolrCloudTestCa
Map<String,Integer> shardToDocCount = new TreeMap<>();
for (Slice slice : docCollection.getActiveSlices()) {
String shardName = slice.getName();
- try (Http2SolrClient leaderClient = new Http2SolrClient.Builder(slice.getLeader().getCoreUrl()).withHttpClient(client.getHttpClient()).build()) {
+ try (Http2SolrClient leaderClient = new Http2SolrClient.Builder(slice.getReplicas().iterator().next().getCoreUrl()).withHttpClient(client.getHttpClient()).build()) {
long docsInShard = leaderClient.query(new SolrQuery("*:*").setParam("distrib", "false"))
.getResults().getNumFound();
shardToDocCount.put(shardName, (int) docsInShard);
diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java
index 93a8793..ade2945 100644
--- a/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/ShardSplitTest.java
@@ -117,7 +117,7 @@ public class ShardSplitTest extends SolrCloudBridgeTestCase {
Add a replica. Ensure count matches in leader and replica.
*/
@Test
- @Ignore // nocommit
+ @Nightly // TODO: look at speeding this up
public void testSplitStaticIndexReplication() throws Exception {
doSplitStaticIndexReplication(SolrIndexSplitter.SplitMethod.REWRITE);
}
@@ -198,7 +198,7 @@ public class ShardSplitTest extends SolrCloudBridgeTestCase {
fail("We could not find a jetty to kill for replica: " + replica.getCoreUrl());
}
- cloudClient.waitForState(collectionName, 30, TimeUnit.SECONDS, SolrCloudTestCase.activeClusterShape(1, 1));
+ cloudClient.waitForState(collectionName, 30, TimeUnit.SECONDS, SolrCloudTestCase.activeClusterShape(2, 2));
// add a new replica for the sub-shard
CollectionAdminRequest.AddReplica addReplica = CollectionAdminRequest.addReplicaToShard(collectionName, SHARD1_0);
@@ -209,7 +209,7 @@ public class ShardSplitTest extends SolrCloudBridgeTestCase {
state = addReplica.processAndWait(cloudClient, 30);
// }
- cloudClient.waitForState(collectionName, 30, TimeUnit.SECONDS, SolrCloudTestCase.activeClusterShape(2, 3));
+ cluster.waitForActiveCollection(collectionName, 30, TimeUnit.SECONDS,2, 3);
if (state == RequestStatusState.COMPLETED) {
CountDownLatch newReplicaLatch = new CountDownLatch(1);
diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/TestLocalFSCloudBackupRestore.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/TestLocalFSCloudBackupRestore.java
index 5efe7b8..c1899db 100644
--- a/solr/core/src/test/org/apache/solr/cloud/api/collections/TestLocalFSCloudBackupRestore.java
+++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/TestLocalFSCloudBackupRestore.java
@@ -42,7 +42,7 @@ import org.junit.Test;
* Solr backup/restore still requires a "shared" file-system. Its just that in this case such file-system would be
* exposed via local file-system API.
*/
-@Ignore // nocommit can hang
+@Ignore // nocommit
public class TestLocalFSCloudBackupRestore extends AbstractCloudBackupRestoreTestCase {
private static String backupLocation;
diff --git a/solr/solrj/src/java/org/apache/solr/common/ParWork.java b/solr/solrj/src/java/org/apache/solr/common/ParWork.java
index 69125c8..8d1a834 100644
--- a/solr/solrj/src/java/org/apache/solr/common/ParWork.java
+++ b/solr/solrj/src/java/org/apache/solr/common/ParWork.java
@@ -640,7 +640,7 @@ public class ParWork implements Closeable {
Thread.currentThread().interrupt();
} else {
if (infoLogMsg) {
- log.info(t.getClass().getName() + " " + t.getMessage());
+ log.info(t.getClass().getName() + " " + t.getMessage(), t);
} else {
log.warn("Solr ran into an unexpected exception", t);
}
@@ -656,7 +656,7 @@ public class ParWork implements Closeable {
}
public static void propagateInterrupt(String msg, Throwable t, boolean infoLogMsg) {
- if (t instanceof InterruptedException) {
+ if (t != null && t instanceof InterruptedException) {
log.info("Interrupted", t);
Thread.currentThread().interrupt();
} else {
@@ -666,7 +666,7 @@ public class ParWork implements Closeable {
log.warn(msg, t);
}
}
- if (t instanceof Error) {
+ if (t != null && t instanceof Error) {
throw (Error) t;
}
}
diff --git a/solr/test-framework/src/resources/logconf/log4j2-startup-debug.xml b/solr/test-framework/src/resources/logconf/log4j2-startup-debug.xml
index ffff946..94271d3 100644
--- a/solr/test-framework/src/resources/logconf/log4j2-startup-debug.xml
+++ b/solr/test-framework/src/resources/logconf/log4j2-startup-debug.xml
@@ -23,10 +23,20 @@
<PatternLayout pattern="%style{%-4r}{yellow} %highlight{%maxLen{%-5p}{6}} [%style{%X{node_name} %X{collection} %X{shard} %X{replica} %X{trace_id}}{cyan}] %style{%c{1.}}{cyan} %highlight{%m %notEmpty{%ex}}\n"/>
</Console>
- <File name="FILE" fileName="solr-test.log" immediateFlush="false" append="false">
+ <File name="FILE" fileName="${sys:user.home}/solr-test.log" immediateFlush="false" append="false">
<PatternLayout pattern="%style{%-4r}{yellow} %highlight{%maxLen{%-5p}{6}} %style{(%t)}{yellow,bold} [%style{%X{node_name} %X{collection} %X{shard} %X{replica} %X{trace_id}}{cyan}] %style{%c{1.}}{cyan} %highlight{%m %notEmpty{%ex}}\n"/>
</File>
+ <File name="FILE2" fileName="${sys:user.home}/solr-test.log" immediateFlush="false" append="false">
+ <PatternLayout>
+ <Pattern>
+ %maxLen{%-4r %-5p (%t) [%X{node_name} %X{collection} %X{shard} %X{replica} %X{core} %X{trace_id}] %c{1.} %m%notEmpty{
+ =>%ex{short}}}{10240}%n
+ </Pattern>
+ </PatternLayout>
+ </File>
+
+
</Appenders>
<Loggers>
<AsyncLogger name="org.apache.zookeeper" level="WARN"/>
@@ -60,7 +70,7 @@
<AsyncRoot level="INFO">
<AppenderRef ref="STDERR_COLOR"/>
- <!-- <AppenderRef ref="FILE"/> -->
+ <AppenderRef ref="FILE2"/>
</AsyncRoot>
</Loggers>
</Configuration>
diff --git a/solr/test-framework/src/resources/logconf/log4j2-std-debug.xml b/solr/test-framework/src/resources/logconf/log4j2-std-debug.xml
index aa90622..3f09f69 100644
--- a/solr/test-framework/src/resources/logconf/log4j2-std-debug.xml
+++ b/solr/test-framework/src/resources/logconf/log4j2-std-debug.xml
@@ -23,7 +23,7 @@
<PatternLayout pattern="%style{%-4r}{yellow} %highlight{%maxLen{%-5p}{6}} %style{(%t)}{yellow,bold} [%style{%X{node_name} %X{collection} %X{shard} %X{replica} %X{core} %X{trace_id}}{cyan}] %style{%c{1.}}{cyan} %highlight{%m %notEmpty{%ex}}\n"/>
</Console>
- <File name="FILE" fileName="solr-test.log" immediateFlush="false" append="false">
+ <File name="FILE" fileName="${sys:user.home}/solr-test.log" immediateFlush="false" append="false">
<PatternLayout pattern="%style{%-4r}{yellow} %highlight{%maxLen{%-5p}{6}} %style{(%t)}{yellow,bold} [%style{%X{node_name} %X{collection} %X{shard} %X{replica} %X{core} %X{trace_id}}{cyan}] %style{%c{1.}}{cyan} %highlight{%m %notEmpty{%ex}}\n"/>
</File>