You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/10/31 04:10:32 UTC
[lucene-solr] branch reference_impl_dev updated: @1096 Go Overseer,
plus ultra.
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/reference_impl_dev by this push:
new 11fc487 @1096 Go Overseer, plus ultra.
11fc487 is described below
commit 11fc487d6107a90586f95476940e92c9c5b24792
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Fri Oct 30 23:10:09 2020 -0500
@1096 Go Overseer, plus ultra.
---
.../src/java/org/apache/solr/cloud/Overseer.java | 7 +-
.../apache/solr/cloud/OverseerTaskProcessor.java | 31 +-
.../org/apache/solr/cloud/OverseerTaskQueue.java | 108 ++++--
.../org/apache/solr/cloud/ZkDistributedQueue.java | 394 ++++++++++++++-------
.../org/apache/solr/common/cloud/SolrZkClient.java | 38 ++
5 files changed, 415 insertions(+), 163 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index a277a42..0fc4350 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -17,6 +17,7 @@
package org.apache.solr.cloud;
import java.io.Closeable;
+import java.io.File;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.Collection;
@@ -315,7 +316,7 @@ public class Overseer implements SolrCloseable {
// We do not need to filter any nodes here cause all processed nodes are removed once we flush clusterstate
long wait = 5000;
- queue = new LinkedList<>(stateUpdateQueue.peekElements(1000, wait, (x) -> false));
+ queue = new LinkedList<>(stateUpdateQueue.peekElements(1000, wait, (x) -> x.startsWith(OverseerTaskQueue.RESPONSE_PREFIX)));
} catch (AlreadyClosedException e) {
if (isClosed()) {
log.info("Overseer closed (AlreadyClosedException), exiting loop");
@@ -350,7 +351,7 @@ public class Overseer implements SolrCloseable {
final ZkNodeProps message = ZkNodeProps.load(data);
if (log.isDebugEnabled()) log.debug("processMessage: queueSize: {}, message = {}", stateUpdateQueue.getZkStats().getQueueLength(), message);
if (log.isDebugEnabled()) log.debug("add processed node: {}, processedNodes = {}", head.first(), stateUpdateQueue.getZkStats().getQueueLength(), processedNodes);
- processedNodes.add(head.first());
+ processedNodes.add(new File(head.first()).getName());
// The callback always be called on this thread
processQueueItem(message, reader.getClusterState(), zkStateWriter, true, null);
}
@@ -361,7 +362,7 @@ public class Overseer implements SolrCloseable {
// if an event comes in the next *ms batch it together
int wait = 10;
if (log.isDebugEnabled()) log.debug("going to peekElements processedNodes={}", processedNodes);
- queue = new LinkedList<>(stateUpdateQueue.peekElements(10, wait, node -> processedNodes.contains(node)));
+ queue = new LinkedList<>(stateUpdateQueue.peekElements(10, wait, node -> processedNodes.contains(node) || node.startsWith(OverseerTaskQueue.RESPONSE_PREFIX)));
}
fallbackQueueSize = processedNodes.size();
// we should force write all pending updates because the next iteration might sleep until there
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
index 6888db1..5c0f33c 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
@@ -100,10 +100,14 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
// This is an optimization to ensure that we do not read the same tasks
// again and again from ZK.
final private Map<String, QueueEvent> blockedTasks = new ConcurrentSkipListMap<>();
- final private Predicate<String> excludedTasks = new Predicate<String>() {
+ final private Predicate<String> excludedTasks = new Predicate<>() {
@Override
public boolean test(String s) {
// nocommit
+ if (s.startsWith(OverseerTaskQueue.RESPONSE_PREFIX)) {
+ if (log.isDebugEnabled()) log.debug("exclude {} due to prefix {}", s, OverseerTaskQueue.RESPONSE_PREFIX);
+ return true;
+ }
boolean contains = runningTasks.contains(s) || blockedTasks.containsKey(s) || runningZKTasks.contains(s);
if (log.isDebugEnabled()) log.debug("test {} against {}, {}, {} : {}", s, runningTasks, blockedTasks, runningZKTasks, contains);
@@ -220,17 +224,22 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
for (QueueEvent head : heads) {
-// if (runningZKTasks.contains(head.getId())) {
-// log.warn("Task found in running ZKTasks already, continuing");
-// continue;
-// }
+ if (runningZKTasks.contains(head.getId())) {
+ log.warn("Task found in running ZKTasks already, continuing");
+ continue;
+ }
+
+ if (head.getBytes() == null) {
+ log.info("Found empty entry id={} event={}", head.getId(), head.getWatchedEvent());
+ continue;
+ }
final ZkNodeProps message = ZkNodeProps.load(head.getBytes());
final String asyncId = message.getStr(ASYNC);
if (hasLeftOverItems) {
if (head.getId().equals(oldestItemInWorkQueue)) hasLeftOverItems = false;
if (asyncId != null && (completedMap.contains(asyncId) || failureMap.contains(asyncId))) {
- log.debug("Found already processed task in workQueue, cleaning up. AsyncId [{}]", asyncId);
+ if (log.isDebugEnabled()) log.debug("Found already processed task in workQueue, cleaning up. AsyncId [{}]", asyncId);
workQueue.remove(head);
continue;
}
@@ -294,9 +303,10 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
Set<Map.Entry<String, QueueEvent>> entrySet = completedTasks.entrySet();
AtomicBoolean sessionExpired = new AtomicBoolean();
AtomicBoolean interrupted = new AtomicBoolean();
+ // TODO: async
try (ParWork work = new ParWork(this, true, true)) {
for (Map.Entry<String, QueueEvent> entry : entrySet) {
- work.collect("cleanWorkQueue", ()->{
+ work.collect("cleanWorkQueue", () -> {
try {
workQueue.remove(entry.getValue());
} catch (KeeperException.SessionExpiredException e) {
@@ -304,11 +314,12 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
} catch (InterruptedException e) {
interrupted.set(true);
} catch (KeeperException e) {
- log.error("Exception removing item from workQueue", e);
+ log.error("Exception removing item from workQueue", e);
}
- runningTasks.remove(entry.getKey());});
- completedTasks.remove(entry.getKey());
+ runningTasks.remove(entry.getKey());
+ });
}
+
}
if (interrupted.get()) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
index 9078f3c..38c019e 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
@@ -16,9 +16,13 @@
*/
package org.apache.solr.cloud;
+import java.io.File;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
@@ -48,7 +52,7 @@ import org.slf4j.LoggerFactory;
public class OverseerTaskQueue extends ZkDistributedQueue {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- private static final String RESPONSE_PREFIX = "qnr-" ;
+ public static final String RESPONSE_PREFIX = "qnr-" ;
private final LongAdder pendingResponses = new LongAdder();
@@ -76,28 +80,47 @@ public class OverseerTaskQueue extends ZkDistributedQueue {
*/
public boolean containsTaskWithRequestId(String requestIdKey, String requestId)
throws KeeperException, InterruptedException {
+ Set<String> childNames;
+ updateLock.lockInterruptibly();
+ try {
- List<String> childNames = zookeeper.getChildren(dir, null, true);
- stats.setQueueLength(childNames.size());
- for (String childName : childNames) {
- if (childName != null && childName.startsWith(PREFIX)) {
- try {
- byte[] data = zookeeper.getData(dir + "/" + childName, null, null, true);
- if (data != null) {
- ZkNodeProps message = ZkNodeProps.load(data);
- if (message.containsKey(requestIdKey)) {
- if (log.isDebugEnabled()) {
- log.debug("Looking for {}, found {}", message.get(requestIdKey), requestId);
+ childNames = knownChildren.keySet();
+
+
+ stats.setQueueLength(childNames.size());
+ for (String childName : childNames) {
+ if (childName != null && childName.startsWith(PREFIX)) {
+ try {
+ byte[] data = knownChildren.get(childName);
+ if (data == null) {
+ data = zookeeper.getData(dir + "/" + childName, null, null, true);
+ if (data != null) {
+ knownChildren.put(childName, data);
+ }
+ }
+ if (data != null) {
+ ZkNodeProps message = ZkNodeProps.load(data);
+ if (message.containsKey(requestIdKey)) {
+ if (log.isDebugEnabled()) {
+ log.debug("Looking for {}, found {}", message.get(requestIdKey), requestId);
+ }
+ if(message.get(requestIdKey).equals(requestId)) return true;
}
- if(message.get(requestIdKey).equals(requestId)) return true;
}
+ } catch (KeeperException.NoNodeException e) {
+ knownChildren.remove(childName);
}
- } catch (KeeperException.NoNodeException e) {
- // Another client removed the node first, try next
}
}
+
+ } finally {
+ if (updateLock.isHeldByCurrentThread()) {
+ updateLock.unlock();
+ }
}
+
+
return false;
}
@@ -109,8 +132,9 @@ public class OverseerTaskQueue extends ZkDistributedQueue {
Timer.Context time = stats.time(dir + "_remove_event");
try {
String path = dir + "/" + event.getId();
- String responsePath = dir + "/" + RESPONSE_PREFIX
+ String responseId = RESPONSE_PREFIX
+ path.substring(path.lastIndexOf("-") + 1);
+ String responsePath = dir + "/" + responseId;
try {
zookeeper.setData(responsePath, event.getBytes(), true);
@@ -122,6 +146,16 @@ public class OverseerTaskQueue extends ZkDistributedQueue {
zookeeper.delete(path, -1, true);
} catch (KeeperException.NoNodeException ignored) {
}
+
+ updateLock.lockInterruptibly();
+ try {
+ knownChildren.remove(event.getId());
+ knownChildren.put(responseId, event.getBytes());
+ } finally {
+ if (updateLock.isHeldByCurrentThread()) {
+ updateLock.unlock();
+ }
+ }
} finally {
time.stop();
}
@@ -277,7 +311,7 @@ public class OverseerTaskQueue extends ZkDistributedQueue {
try {
for (Pair<String, byte[]> element : peekElements(n, waitMillis, excludeSet)) {
if (log.isDebugEnabled()) log.debug("Add to topN {}", dir + "/" + element.first());
- topN.add(new QueueEvent(element.first(),
+ topN.add(new QueueEvent(new File(element.first()).getName(),
element.second(), null));
}
printQueueEventsListElementIds(topN);
@@ -305,16 +339,44 @@ public class OverseerTaskQueue extends ZkDistributedQueue {
*/
public String getTailId() throws KeeperException, InterruptedException {
// TODO: could we use getChildren here? Unsure what freshness guarantee the caller needs.
- TreeSet<String> orderedChildren = fetchZkChildren(null, null);
+ TreeMap<String,byte[]> orderedChildren = fetchZkChildren(null, null);
- for (String headNode : orderedChildren.descendingSet())
+ for (Map.Entry<String,byte[]> headNode : orderedChildren.entrySet())
if (headNode != null) {
try {
- QueueEvent queueEvent = new QueueEvent(headNode, zookeeper.getData(dir + "/" + headNode,
- null, null, true), null);
+ byte[] data;
+ updateLock.lockInterruptibly();
+ try {
+ data = knownChildren.get(headNode.getKey());
+ } finally {
+ if (updateLock.isHeldByCurrentThread()) {
+ updateLock.unlock();
+ }
+ }
+ if (data == null) {
+ data = zookeeper.getData(dir + "/" + headNode.getKey(), null, null, true);
+ }
+ QueueEvent queueEvent = new QueueEvent(headNode.getKey(), data, null);
+
+ updateLock.lockInterruptibly();
+ try {
+ knownChildren.put(headNode.getKey(), data);
+ } finally {
+ if (updateLock.isHeldByCurrentThread()) {
+ updateLock.unlock();
+ }
+ }
+
return queueEvent.getId();
} catch (KeeperException.NoNodeException e) {
- // Another client removed the node first, try next
+ updateLock.lockInterruptibly();
+ try {
+ knownChildren.remove(headNode.getKey());
+ } finally {
+ if (updateLock.isHeldByCurrentThread()) {
+ updateLock.unlock();
+ }
+ }
}
}
return null;
@@ -346,7 +408,7 @@ public class OverseerTaskQueue extends ZkDistributedQueue {
private volatile byte[] bytes;
QueueEvent(String id, byte[] bytes, WatchedEvent event) {
- if (log.isDebugEnabled()) log.debug("Create QueueEvent with id {}", id);
+ if (log.isDebugEnabled()) log.debug("Create QueueEvent with id {} {} {}", id, bytes != null ? bytes.length : 0, event);
this.id = id;
this.bytes = bytes;
this.event = event;
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java b/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java
index b29768c..140443e 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java
@@ -16,15 +16,18 @@
*/
package org.apache.solr.cloud;
+import java.io.File;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
+import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -98,7 +101,7 @@ public class ZkDistributedQueue implements DistributedQueue {
* Therefore, methods like {@link #peek()} have to double-check actual node existence, and methods
* like {@link #poll()} must resolve any races by attempting to delete the underlying node.
*/
- protected volatile TreeSet<String> knownChildren;
+ protected volatile TreeMap<String,byte[]> knownChildren;
/**
* Used to wait on ZK changes to the child list; you must hold {@link #updateLock} before waiting on this condition.
@@ -134,14 +137,14 @@ public class ZkDistributedQueue implements DistributedQueue {
try {
try {
updateLock.lockInterruptibly();
- knownChildren = fetchZkChildren(null, null);
+ fetchZkChildren(null, null);
} catch (KeeperException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
} catch (InterruptedException e) {
ParWork.propagateInterrupt(e);
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
}
- }finally {
+ } finally {
if (updateLock.isHeldByCurrentThread()) {
updateLock.unlock();
}
@@ -186,39 +189,38 @@ public class ZkDistributedQueue implements DistributedQueue {
@Override
public byte[] peek(long wait) throws KeeperException, InterruptedException {
byte[] result = null;
- try {
- Timer.Context time;
- if (wait == Long.MAX_VALUE) {
- time = stats.time(dir + "_peek_wait_forever");
- } else {
- time = stats.time(dir + "_peek_wait" + wait);
- }
- long waitNanos = TimeUnit.MILLISECONDS.toNanos(wait);
+ Timer.Context time;
+ if (wait == Long.MAX_VALUE) {
+ time = stats.time(dir + "_peek_wait_forever");
+ } else {
+ time = stats.time(dir + "_peek_wait" + wait);
+ }
- result = firstElement();
- if (result != null) {
- return result;
- }
+ long waitNanos = TimeUnit.MILLISECONDS.toNanos(wait);
- ChildWatcher watcher = new ChildWatcher();
- TreeSet<String> foundChildren = fetchZkChildren(watcher, null);
+ result = firstElement();
+ if (result != null) {
+ return result;
+ }
- TimeOut timeout = new TimeOut(waitNanos, TimeUnit.NANOSECONDS, TimeSource.NANO_TIME);
+ ChildWatcher watcher = new ChildWatcher();
+ TreeMap<String,byte[]> foundChildren = fetchZkChildren(watcher, null);
- waitForChildren(null, foundChildren, waitNanos, timeout, watcher);
- if (foundChildren.size() == 0) {
- return null;
- }
+ if (foundChildren.size() > 0) {
result = firstElement();
return result;
- } finally {
- try {
- zookeeper.getSolrZooKeeper().removeAllWatches(dir, Watcher.WatcherType.Children, false);
- } catch (Exception e) {
- log.info(e.getMessage());
- }
}
+
+ TimeOut timeout = new TimeOut(waitNanos, TimeUnit.NANOSECONDS, TimeSource.NANO_TIME);
+
+ waitForChildren(null, foundChildren, waitNanos, timeout, watcher);
+ if (foundChildren.size() == 0) {
+ return null;
+ }
+ result = firstElement();
+ return result;
+
}
/**
@@ -256,9 +258,22 @@ public class ZkDistributedQueue implements DistributedQueue {
}
}
+ // TODO: use async
public void remove(Collection<String> paths) throws KeeperException, InterruptedException {
if (log.isDebugEnabled()) log.debug("Remove paths from queue {} {}", dir, paths);
if (paths.isEmpty()) return;
+
+ updateLock.lockInterruptibly();
+ try {
+ for (String path : paths) {
+ knownChildren.remove(path);
+ }
+ } finally {
+ if (updateLock.isHeldByCurrentThread()) {
+ updateLock.unlock();
+ }
+ }
+
List<Op> ops = new ArrayList<>();
for (String path : paths) {
ops.add(Op.delete(dir + "/" + path, -1));
@@ -283,16 +298,6 @@ public class ZkDistributedQueue implements DistributedQueue {
}
}
}
- updateLock.lockInterruptibly();
- try {
- for (String path : paths) {
- knownChildren.remove(path);
- }
- } finally {
- if (updateLock.isHeldByCurrentThread()) {
- updateLock.unlock();
- }
- }
}
/**
@@ -314,7 +319,7 @@ public class ZkDistributedQueue implements DistributedQueue {
}
ChildWatcher watcher = new ChildWatcher();
- TreeSet<String> foundChildren = fetchZkChildren(watcher, null);
+ TreeMap<String,byte[]> foundChildren = fetchZkChildren(watcher, null);
TimeOut timeout = new TimeOut(waitNanos, TimeUnit.NANOSECONDS, TimeSource.NANO_TIME);
@@ -410,18 +415,17 @@ public class ZkDistributedQueue implements DistributedQueue {
/**
* Returns the name if the first known child node, or {@code null} if the queue is empty.
- * This is the only place {@link #knownChildren} is ever updated!
- * The caller must double check that the actual node still exists, since the in-memory
- * list is inherently stale.
+ * @return
*/
- private String firstChild(boolean remove) {
+ private Map.Entry<String,byte[]> firstChild(boolean remove) {
try {
updateLock.lockInterruptibly();
try {
// We always return from cache first, the cache will be cleared if the node is not exist
if (!knownChildren.isEmpty()) {
- return remove ? knownChildren.pollFirst() : knownChildren.first();
+ return remove ? knownChildren.pollFirstEntry() : knownChildren.firstEntry();
}
+
return null;
} finally {
if (updateLock.isHeldByCurrentThread()) {
@@ -437,13 +441,47 @@ public class ZkDistributedQueue implements DistributedQueue {
/**
* Return the current set of children from ZK; does not change internal state.
*/
- TreeSet<String> fetchZkChildren(Watcher watcher, Predicate<String> acceptFilter) throws KeeperException {
- TreeSet<String> orderedChildren = new TreeSet<>();
+ TreeMap<String,byte[]> fetchZkChildren(Watcher watcher, Predicate<String> acceptFilter) throws KeeperException, InterruptedException {
+
+ TreeMap<String,byte[]> orderedChildren = new TreeMap<>();
+ updateLock.lockInterruptibly();
+ try {
+ if (knownChildren != null && !knownChildren.isEmpty()) {
+ Set<Map.Entry<String,byte[]>> entrySet = knownChildren.entrySet();
+ for (Map.Entry<String,byte[]> entry : entrySet) {
+ String childName = entry.getKey();
+ // Check format
+ if (!childName.startsWith(PREFIX)) {
+
+ // responses can be written to same queue with different naming scheme
+ if (log.isDebugEnabled()) log.debug("Found child node with improper name: {}, prefix={}", childName, PREFIX);
+ continue;
+ }
+ if (acceptFilter != null && acceptFilter.test(childName)) {
+ if (log.isDebugEnabled()) log.debug("Found child that matched exclude filter: {}", dir + "/" + childName);
+ continue;
+ }
+
+ orderedChildren.put(childName, entry.getValue());
+ }
+ }
+ } finally {
+ if (updateLock.isHeldByCurrentThread()) {
+ updateLock.unlock();
+ }
+ }
+
+ if (!orderedChildren.isEmpty()) {
+ return orderedChildren;
+ }
+
+ TreeMap<String,byte[]> remoteKnownChildren = new TreeMap<>();
try {
List<String> childNames = zookeeper.getChildren(dir, watcher, true);
stats.setQueueLength(childNames.size());
for (String childName : childNames) {
if (log.isDebugEnabled()) log.debug("Examine child: {} out of {} {}", childName, childNames.size(), acceptFilter);
+ remoteKnownChildren.put(childName, null);
// Check format
if (!childName.regionMatches(0, PREFIX, 0, PREFIX.length())) {
@@ -456,8 +494,9 @@ public class ZkDistributedQueue implements DistributedQueue {
continue;
}
if (log.isDebugEnabled()) log.debug("Add child to fetched children: {}", childName);
- orderedChildren.add(childName);
+ orderedChildren.put(childName, null);
}
+ updateKnownChildren(remoteKnownChildren);
return orderedChildren;
} catch (InterruptedException e) {
ParWork.propagateInterrupt(e);
@@ -465,6 +504,28 @@ public class ZkDistributedQueue implements DistributedQueue {
}
}
+ private void updateKnownChildren(TreeMap<String,byte[]> children) {
+ updateLock.lock();
+ try {
+ Set<Map.Entry<String,byte[]>> entrySet = children.entrySet();
+ for (Map.Entry<String,byte[]> entry : entrySet) {
+ String childName = entry.getKey();
+ byte[] data = knownChildren == null ? null : knownChildren.get(childName);
+ if (data != null) {
+ if (childName.startsWith("/")) {
+ throw new IllegalArgumentException(childName);
+ }
+ children.put(childName, data);
+ }
+ }
+ knownChildren = children;
+ } catch (Exception e) {
+ log.error("", e);
+ } finally {
+ updateLock.unlock();
+ }
+ }
+
/**
* Return the currently-known set of elements, using child names from memory. If no children are found, or no
* children pass {@code acceptFilter}, waits up to {@code waitMillis} for at least one child to become available.
@@ -472,11 +533,11 @@ public class ZkDistributedQueue implements DistributedQueue {
* Package-private to support {@link OverseerTaskQueue} specifically.</p>
*/
@Override
- public Collection<Pair<String, byte[]>> peekElements(int max, long waitMillis, Predicate<String> acceptFilter) throws KeeperException {
+ public Collection<Pair<String, byte[]>> peekElements(int max, long waitMillis, Predicate<String> acceptFilter) throws KeeperException, InterruptedException {
if (log.isDebugEnabled()) log.debug("peekElements {} {}", max, acceptFilter);
List<Pair<String,byte[]>> result = null;
ChildWatcher watcher = new ChildWatcher();
- TreeSet<String> foundChildren = fetchZkChildren(watcher, acceptFilter);
+ TreeMap<String,byte[]> foundChildren = fetchZkChildren(watcher, acceptFilter);
long waitNanos = TimeUnit.MILLISECONDS.toNanos(waitMillis);
TimeOut timeout = new TimeOut(waitNanos, TimeUnit.NANOSECONDS, TimeSource.NANO_TIME);
try {
@@ -487,98 +548,122 @@ public class ZkDistributedQueue implements DistributedQueue {
// Technically we could restart the method if we fasil to actually obtain any valid children
// from ZK, but this is a super rare case, and the latency of the ZK fetches would require
// much more sophisticated waitNanos tracking.
- result = new ArrayList<>(foundChildren.size());
- for (String child : foundChildren) {
+ result = Collections.synchronizedList(new ArrayList<>(foundChildren.size()));
+ Set<String> dataPaths = new HashSet<>();
+ for (Map.Entry<String,byte[]> child : foundChildren.entrySet()) {
if (result.size() >= max) {
break;
}
+ byte[] data;
+ updateLock.lockInterruptibly();
try {
- byte[] data = zookeeper.getData(dir + "/" + child, null, null, true);
- if (log.isDebugEnabled()) log.debug("get data for child={}", child);
- result.add(new Pair<>(child, data));
- } catch (KeeperException.NoNodeException e) {
- if (log.isDebugEnabled()) log.debug("no node found for child={}", child);
- updateLock.lockInterruptibly();
- try {
- knownChildren.remove(child);
- } finally {
- if (updateLock.isHeldByCurrentThread()) {
- updateLock.unlock();
- }
+ data = knownChildren.get(child.getKey());
+ } finally {
+ if (updateLock.isHeldByCurrentThread()) {
+ updateLock.unlock();
}
}
+
+ if (data == null) {
+ // nocommit - lets not reget what we have in knownChildren, also, use asyncp
+ dataPaths.add(dir + "/" + child.getKey());
+ if (log.isDebugEnabled()) log.debug("get data for child={}", child.getKey());
+ } else {
+ result.add(new Pair<>(child.getKey(), data));
+ }
}
- return result;
+
+ Map<String,byte[]> dataMap = zookeeper.getData(dataPaths);
+ updateLock.lockInterruptibly();
+ try {
+ List<Pair<String,byte[]>> finalResult = result;
+ dataMap.forEach((k, bytes) -> {
+ finalResult.add(new Pair<>(k, bytes));
+ if (bytes != null) {
+ knownChildren.put(new File(k).getName(), bytes);
+ } else {
+ knownChildren.remove(new File(k).getName());
+ }
+ });
+ } finally {
+ if (updateLock.isHeldByCurrentThread()) {
+ updateLock.unlock();
+ }
+ }
+
+ return new ArrayList<>(result);
} catch (InterruptedException e) {
ParWork.propagateInterrupt(e);
throw new AlreadyClosedException(e);
- } finally {
- try {
- zookeeper.getSolrZooKeeper().removeAllWatches(dir, Watcher.WatcherType.Children, false);
- } catch (Exception e) {
- log.info(e.getMessage());
- }
}
}
- private void waitForChildren(Predicate<String> acceptFilter, TreeSet<String> foundChildren, long waitNanos, TimeOut timeout, ChildWatcher watcher) throws InterruptedException, KeeperException {
+ private void waitForChildren(Predicate<String> acceptFilter, TreeMap<String,byte[]> foundChildren, long waitNanos, TimeOut timeout, ChildWatcher watcher) throws InterruptedException, KeeperException {
if (log.isDebugEnabled()) log.debug("wait for children ... {}", waitNanos);
- updateLock.lockInterruptibly();
try {
- for (String child : knownChildren) {
- if (acceptFilter == null || !acceptFilter.test(child)) {
- foundChildren.add(child);
- }
- }
- } finally {
- if (updateLock.isHeldByCurrentThread()) {
- updateLock.unlock();
- }
- }
- if (!foundChildren.isEmpty()) {
- return;
- }
- if (waitNanos <= 0) {
- if (log.isDebugEnabled()) log.debug("0 wait time and no children found, return");
- return;
- }
- TreeSet<String> fc = null;
- while (fc == null || fc.isEmpty()) {
- if (watcher.fired) {
- watcher.fired = false;
- fc = fetchZkChildren(watcher, acceptFilter);
- if (!fc.isEmpty()) {
- foundChildren.addAll(fc);
- return;
- }
- }
updateLock.lockInterruptibly();
try {
- try {
- changed.await(10, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- ParWork.propagateInterrupt(e);
- }
- if (zookeeper.isClosed() || !zookeeper.isConnected()) {
- throw new AlreadyClosedException();
- }
- if (timeout.hasTimedOut()) {
- return;
- }
- for (String child : knownChildren) {
- if (acceptFilter == null || !acceptFilter.test(child)) {
- foundChildren.add(child);
+ for (Map.Entry<String,byte[]> child : knownChildren.entrySet()) {
+ if (acceptFilter == null || !acceptFilter.test(child.getKey())) {
+ foundChildren.put(child.getKey(), child.getValue());
}
}
- if (!foundChildren.isEmpty()) {
- return;
- }
} finally {
if (updateLock.isHeldByCurrentThread()) {
updateLock.unlock();
}
}
+ if (!foundChildren.isEmpty()) {
+ return;
+ }
+ if (waitNanos <= 0) {
+ if (log.isDebugEnabled()) log.debug("0 wait time and no children found, return");
+ return;
+ }
+ TreeMap<String,byte[]> fc = null;
+ while (fc == null || fc.isEmpty()) {
+ if (watcher.fired) {
+ watcher.fired = false;
+ fc = fetchZkChildren(watcher, acceptFilter);
+ if (!fc.isEmpty()) {
+ foundChildren.putAll(fc);
+ return;
+ }
+ }
+ updateLock.lockInterruptibly();
+ try {
+ try {
+ changed.await(10, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ ParWork.propagateInterrupt(e);
+ }
+ if (zookeeper.isClosed() || !zookeeper.isConnected()) {
+ throw new AlreadyClosedException();
+ }
+ if (timeout.hasTimedOut()) {
+ return;
+ }
+ for (Map.Entry<String,byte[]> child : knownChildren.entrySet()) {
+ if (acceptFilter == null || !acceptFilter.test(child.getKey())) {
+ foundChildren.put(child.getKey(), child.getValue());
+ }
+ }
+ if (!foundChildren.isEmpty()) {
+ return;
+ }
+ } finally {
+ if (updateLock.isHeldByCurrentThread()) {
+ updateLock.unlock();
+ }
+ }
+ }
+ } finally {
+ try {
+ zookeeper.getSolrZooKeeper().removeAllWatches(dir, Watcher.WatcherType.Children, false);
+ } catch (Exception e) {
+ log.info(e.getMessage());
+ }
}
}
@@ -590,16 +675,48 @@ public class ZkDistributedQueue implements DistributedQueue {
private byte[] firstElement() throws KeeperException {
try {
- String firstChild = null;
- firstChild = firstChild(false);
- if (firstChild == null) {
- return null;
+ Map.Entry<String,byte[]> firstChild = firstChild(false);
+ if (firstChild == null) {
+ return null;
+ }
+ byte[] data;
+ updateLock.lockInterruptibly();
+ try {
+ data = knownChildren.get(firstChild.getKey());
+ if (data != null) {
+ return data;
+ }
+ } finally {
+ if (updateLock.isHeldByCurrentThread()) {
+ updateLock.unlock();
+ }
+ }
+
+ try {
+ data = zookeeper.getData(dir + "/" + firstChild.getKey(), null, null, true);
+ if (data != null) {
+ updateLock.lockInterruptibly();
+ try {
+ knownChildren.put(firstChild.getKey(), data);
+ } finally {
+ if (updateLock.isHeldByCurrentThread()) {
+ updateLock.unlock();
+ }
+ }
}
+
+ return data;
+ } catch (KeeperException.NoNodeException e) {
+ updateLock.lockInterruptibly();
try {
- return zookeeper.getData(dir + "/" + firstChild, null, null, true);
- } catch (KeeperException.NoNodeException e) {
- return null;
+ knownChildren.remove(firstChild.getKey());
+ } finally {
+ if (updateLock.isHeldByCurrentThread()) {
+ updateLock.unlock();
+ }
}
+ return null;
+ }
} catch (InterruptedException e) {
ParWork.propagateInterrupt(e);
@@ -609,14 +726,37 @@ public class ZkDistributedQueue implements DistributedQueue {
private byte[] removeFirst() throws KeeperException {
try {
- String firstChild = firstChild(true);
+ Map.Entry<String,byte[]> firstChild = firstChild(true);
if (firstChild == null) {
return null;
}
+
+ byte[] data;
+ updateLock.lockInterruptibly();
try {
- String path = dir + "/" + firstChild;
+ data = knownChildren.get(firstChild.getKey());
+ if (data != null) {
+ return data;
+ }
+ } finally {
+ if (updateLock.isHeldByCurrentThread()) {
+ updateLock.unlock();
+ }
+ }
+
+ try {
+ String path = dir + "/" + firstChild.getKey();
byte[] result = zookeeper.getData(path, null, null, true);
zookeeper.delete(path, -1, true);
+ updateLock.lockInterruptibly();
+ try {
+ knownChildren.remove(firstChild.getKey());
+
+ } finally {
+ if (updateLock.isHeldByCurrentThread()) {
+ updateLock.unlock();
+ }
+ }
return result;
} catch (KeeperException.NoNodeException e) {
return null;
@@ -646,10 +786,10 @@ public class ZkDistributedQueue implements DistributedQueue {
if (event.getType() == Event.EventType.NodeChildrenChanged) {
updateLock.lock();
try {
- knownChildren = fetchZkChildren(null, null);
+ fetchZkChildren(null, null);
fired = true;
changed.signalAll();
- } catch (KeeperException e) {
+ } catch (KeeperException | InterruptedException e) {
log.error("", e);
} finally {
updateLock.unlock();
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
index 405083e..88c9bbc 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
@@ -41,6 +41,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
@@ -727,6 +728,43 @@ public class SolrZkClient implements Closeable {
}
}
+ public Map<String,byte[]> getData(Set<String> paths) {
+
+ Map<String,byte[]> dataMap = new ConcurrentHashMap<>(paths.size());
+ CountDownLatch latch = new CountDownLatch(paths.size());
+
+ for (String path : paths) {
+ ZooKeeper keeper = connManager.getKeeper();
+ assert keeper != null;
+ keeper.getData(path, false, (rc, path1, ctx, data, stat) -> {
+ if (rc != 0) {
+ final KeeperException.Code keCode = KeeperException.Code.get(rc);
+ if (keCode == KeeperException.Code.NONODE) {
+ if (log.isDebugEnabled()) log.debug("No node found for {}", path1);
+ }
+ }
+
+ dataMap.put(path1, data);
+ latch.countDown();
+ }, null);
+ }
+
+ boolean success;
+ try {
+ success = latch.await(15, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ ParWork.propagateInterrupt(e);
+ log.error("mkDirs(String=" + paths + ")", e);
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+ }
+
+ if (!success) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Timeout waiting for operations to complete");
+ }
+
+ return dataMap;
+ }
+
// Calls setData for a list of existing paths in parallel
private void updateExistingPaths(List<String> pathsToUpdate, Map<String,byte[]> dataMap) throws KeeperException {
final KeeperException[] keeperExceptions = new KeeperException[1];