You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/10/31 07:12:28 UTC
[lucene-solr] branch reference_impl_dev updated: @1097 And beyond.
Go Overseer.
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/reference_impl_dev by this push:
new bafc793 @1097 And beyond. Go Overseer.
bafc793 is described below
commit bafc7934c082ddb6d5e18afb3561f8a664c1a967
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Sat Oct 31 02:11:56 2020 -0500
@1097 And beyond. Go Overseer.
---
.../src/java/org/apache/solr/cloud/Overseer.java | 37 +--
.../java/org/apache/solr/cloud/ZkController.java | 2 +-
.../org/apache/solr/cloud/ZkDistributedQueue.java | 258 +++++++++++----------
.../solr/cloud/api/collections/DeleteShardCmd.java | 27 ++-
.../apache/solr/cloud/overseer/ZkStateWriter.java | 7 -
.../src/java/org/apache/solr/core/SolrCore.java | 4 +-
.../org/apache/solr/cloud/DeleteShardTest.java | 5 +-
.../apache/solr/cloud/DistributedQueueTest.java | 66 +++---
.../test/org/apache/solr/cloud/OverseerTest.java | 8 +-
.../solr/client/solrj/cloud/DistributedQueue.java | 12 +-
.../solrj/request/CollectionAdminRequest.java | 10 +-
.../java/org/apache/solr/common/util/TimeOut.java | 8 +-
.../apache/solr/cloud/MiniSolrCloudCluster.java | 2 +-
13 files changed, 245 insertions(+), 201 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index 0fc4350..c5582cd 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -253,7 +253,7 @@ public class Overseer implements SolrCloseable {
// if there were any errors while processing
// the state queue, items would have been left in the
// work queue so let's process those first
- byte[] data = fallbackQueue.peek();
+ byte[] data = fallbackQueue.peek(null);
// TODO: can we do this with a builk call instead?
while (fallbackQueueSize > 0 && data != null) {
final ZkNodeProps message = ZkNodeProps.load(data);
@@ -275,7 +275,7 @@ public class Overseer implements SolrCloseable {
log.warn(
"Exception when process message = {}, consider as bad message and poll out from the queue",
message);
- fallbackQueue.poll();
+ fallbackQueue.poll(null);
}
} catch (InterruptedException e1) {
ParWork.propagateInterrupt(e);
@@ -285,8 +285,8 @@ public class Overseer implements SolrCloseable {
}
throw exp;
}
- fallbackQueue.poll(); // poll-ing removes the element we got by peek-ing
- data = fallbackQueue.peek();
+ fallbackQueue.poll(null); // poll-ing removes the element we got by peek-ing
+ data = fallbackQueue.peek(null);
fallbackQueueSize--;
}
// force flush at the end of the loop, if there are no pending updates, this is a no op call
@@ -337,28 +337,37 @@ public class Overseer implements SolrCloseable {
}
try {
Set<String> processedNodes = new HashSet<>();
- TimeOut timeout = new TimeOut(1, TimeUnit.SECONDS, TimeSource.NANO_TIME);
+ TimeOut timeout = new TimeOut(3, TimeUnit.SECONDS, TimeSource.NANO_TIME);
while (queue != null && !queue.isEmpty()) {
if (isClosed()) {
log.info("Closing");
return;
}
- if (timeout.hasTimedOut()) {
- break;
- }
for (Pair<String, byte[]> head : queue) {
byte[] data = head.second();
- final ZkNodeProps message = ZkNodeProps.load(data);
- if (log.isDebugEnabled()) log.debug("processMessage: queueSize: {}, message = {}", stateUpdateQueue.getZkStats().getQueueLength(), message);
- if (log.isDebugEnabled()) log.debug("add processed node: {}, processedNodes = {}", head.first(), stateUpdateQueue.getZkStats().getQueueLength(), processedNodes);
- processedNodes.add(new File(head.first()).getName());
- // The callback always be called on this thread
- processQueueItem(message, reader.getClusterState(), zkStateWriter, true, null);
+
+ if (log.isDebugEnabled()) log.debug("look at node {} data={}", head.first(), head.second() == null ? null : head.second().length);
+ if (head.second() != null && head.second().length > 0) {
+ final ZkNodeProps message = ZkNodeProps.load(data);
+ if (log.isDebugEnabled()) log.debug("processMessage: queueSize: {}, message = {}", stateUpdateQueue.getZkStats().getQueueLength(), message);
+ if (log.isDebugEnabled()) log.debug("add processed node: {}, processedNodes = {}", head.first(), stateUpdateQueue.getZkStats().getQueueLength(), processedNodes);
+ processedNodes.add(new File(head.first()).getName());
+ // The callback always be called on this thread
+ processQueueItem(message, reader.getClusterState(), zkStateWriter, true, null);
+ } else {
+ log.warn("Found queue item with no data, removing it {} : {}", head.first(), new File(head.first()).getName());
+ processedNodes.add(new File(head.first()).getName());
+ }
}
if (isClosed()) {
log.info("Overseer closed, exiting loop");
return;
}
+
+ if (timeout.hasTimedOut()) {
+ if (log.isDebugEnabled()) log.debug("timeout, skipping out on tight loop {}", timeout.getInterval(TimeUnit.MILLISECONDS));
+ break;
+ }
// if an event comes in the next *ms batch it together
int wait = 10;
if (log.isDebugEnabled()) log.debug("going to peekElements processedNodes={}", processedNodes);
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index ab3339c..a52567b 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -2728,7 +2728,7 @@ public class ZkController implements Closeable, Runnable {
if (replicaRemoved) {
try {
log.info("Replica {} removed from clusterstate, remove it.", coreName);
- // getCoreContainer().unload(coreName, true, true, true); // nocommit - this causes bad things in tests
+ getCoreContainer().unload(coreName, true, true, true); // nocommit - this causes bad things in tests
} catch (SolrException e) {
if (!e.getMessage().contains("Cannot unload non-existent core")) {
// no need to log if the core was already unloaded
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java b/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java
index 140443e..ed18f61 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java
@@ -98,8 +98,8 @@ public class ZkDistributedQueue implements DistributedQueue {
* Contains the last set of children fetched from ZK. Elements are removed from the head of
* this in-memory set as they are consumed from the queue. Due to the distributed nature
* of the queue, elements may appear in this set whose underlying nodes have been consumed in ZK.
- * Therefore, methods like {@link #peek()} have to double-check actual node existence, and methods
- * like {@link #poll()} must resolve any races by attempting to delete the underlying node.
+ * Therefore, methods like {@link #peek(Predicate<String>)} have to double-check actual node existence, and methods
+ * like {@link #poll(Predicate<String>)} must resolve any races by attempting to delete the underlying node.
*/
protected volatile TreeMap<String,byte[]> knownChildren;
@@ -158,10 +158,10 @@ public class ZkDistributedQueue implements DistributedQueue {
* @return data at the first element of the queue, or null.
*/
@Override
- public byte[] peek() throws KeeperException, InterruptedException {
+ public byte[] peek(Predicate<String> acceptFilter) throws KeeperException, InterruptedException {
Timer.Context time = stats.time(dir + "_peek");
try {
- return firstElement();
+ return firstElement(acceptFilter);
} finally {
time.stop();
}
@@ -175,8 +175,8 @@ public class ZkDistributedQueue implements DistributedQueue {
* @return data at the first element of the queue, or null.
*/
@Override
- public byte[] peek(boolean block) throws KeeperException, InterruptedException {
- return block ? peek(Long.MAX_VALUE) : peek();
+ public byte[] peek(Predicate<String> acceptFilter, boolean block) throws KeeperException, InterruptedException {
+ return block ? peek(acceptFilter, Long.MAX_VALUE) : peek(acceptFilter);
}
/**
@@ -187,7 +187,7 @@ public class ZkDistributedQueue implements DistributedQueue {
* @return data at the first element of the queue, or null.
*/
@Override
- public byte[] peek(long wait) throws KeeperException, InterruptedException {
+ public byte[] peek(Predicate<String> acceptFilter, long wait) throws KeeperException, InterruptedException {
byte[] result = null;
Timer.Context time;
@@ -199,7 +199,7 @@ public class ZkDistributedQueue implements DistributedQueue {
long waitNanos = TimeUnit.MILLISECONDS.toNanos(wait);
- result = firstElement();
+ result = firstElement(acceptFilter);
if (result != null) {
return result;
}
@@ -208,17 +208,17 @@ public class ZkDistributedQueue implements DistributedQueue {
TreeMap<String,byte[]> foundChildren = fetchZkChildren(watcher, null);
if (foundChildren.size() > 0) {
- result = firstElement();
+ result = firstElement(acceptFilter);
return result;
}
TimeOut timeout = new TimeOut(waitNanos, TimeUnit.NANOSECONDS, TimeSource.NANO_TIME);
- waitForChildren(null, foundChildren, waitNanos, timeout, watcher);
+ waitForChildren(null, foundChildren, timeout, watcher);
if (foundChildren.size() == 0) {
return null;
}
- result = firstElement();
+ result = firstElement(acceptFilter);
return result;
}
@@ -230,10 +230,10 @@ public class ZkDistributedQueue implements DistributedQueue {
* @return Head of the queue or null.
*/
@Override
- public byte[] poll() throws KeeperException, InterruptedException {
+ public byte[] poll(Predicate<String> acceptFilter) throws KeeperException, InterruptedException {
Timer.Context time = stats.time(dir + "_poll");
try {
- return removeFirst();
+ return removeFirst(acceptFilter);
} finally {
time.stop();
}
@@ -245,10 +245,10 @@ public class ZkDistributedQueue implements DistributedQueue {
* @return The former head of the queue
*/
@Override
- public byte[] remove() throws NoSuchElementException, KeeperException, InterruptedException {
+ public byte[] remove(Predicate<String> acceptFilter) throws NoSuchElementException, KeeperException{
Timer.Context time = stats.time(dir + "_remove");
try {
- byte[] result = removeFirst();
+ byte[] result = removeFirst(acceptFilter);
if (result == null) {
throw new NoSuchElementException();
}
@@ -306,29 +306,29 @@ public class ZkDistributedQueue implements DistributedQueue {
* @return The former head of the queue
*/
@Override
- public byte[] take() throws KeeperException, InterruptedException {
+ public byte[] take(Predicate<String> acceptFilter) throws KeeperException, InterruptedException {
// Same as for element. Should refactor this.
Timer.Context timer = stats.time(dir + "_take");
updateLock.lockInterruptibly();
try {
long waitNanos = TimeUnit.MILLISECONDS.toNanos(60000);
- byte[] result = removeFirst();
+ byte[] result = removeFirst(acceptFilter);
if (result != null) {
return result;
}
ChildWatcher watcher = new ChildWatcher();
- TreeMap<String,byte[]> foundChildren = fetchZkChildren(watcher, null);
+ TreeMap<String,byte[]> foundChildren = fetchZkChildren(watcher, acceptFilter);
TimeOut timeout = new TimeOut(waitNanos, TimeUnit.NANOSECONDS, TimeSource.NANO_TIME);
- waitForChildren(null, foundChildren, waitNanos, timeout, watcher);
+ waitForChildren( s -> s.startsWith(PREFIX) || acceptFilter.test(s), foundChildren, timeout, watcher);
if (foundChildren.size() == 0) {
return null;
}
- result = removeFirst();
+ result = removeFirst(acceptFilter);
if (result != null) {
return result;
}
@@ -348,6 +348,7 @@ public class ZkDistributedQueue implements DistributedQueue {
@Override
public void offer(byte[] data) throws KeeperException {
Timer.Context time = stats.time(dir + "_offer");
+ if (log.isDebugEnabled()) log.debug("Over item to queue {}", dir);
try {
try {
if (maxQueueSize > 0) {
@@ -417,22 +418,38 @@ public class ZkDistributedQueue implements DistributedQueue {
* Returns the name if the first known child node, or {@code null} if the queue is empty.
* @return
*/
- private Map.Entry<String,byte[]> firstChild(boolean remove) {
+ private Map.Entry<String,byte[]> firstChild(boolean remove, Predicate<String> acceptFilter) {
try {
updateLock.lockInterruptibly();
try {
// We always return from cache first, the cache will be cleared if the node is not exist
if (!knownChildren.isEmpty()) {
- return remove ? knownChildren.pollFirstEntry() : knownChildren.firstEntry();
+ for (Map.Entry<String,byte[]> entry : knownChildren.entrySet()) {
+ if (acceptFilter != null && acceptFilter.test(entry.getKey())) {
+ continue;
+ }
+ if (remove) {
+ knownChildren.remove(entry.getKey());
+ try {
+ zookeeper.delete(dir + "/" + entry.getKey(), -1);
+ } catch (KeeperException.NoNodeException e) {
+ if (log.isDebugEnabled()) log.debug("No node found for {}", entry.getKey());
+ }
+ return entry;
+ }
+ }
+ return null;
}
-
- return null;
} finally {
if (updateLock.isHeldByCurrentThread()) {
updateLock.unlock();
}
}
- } catch (InterruptedException e) {
+
+ fetchZkChildren(null, acceptFilter);
+
+ return null;
+ } catch (InterruptedException | KeeperException e) {
ParWork.propagateInterrupt(e);
throw new AlreadyClosedException(e);
}
@@ -442,7 +459,7 @@ public class ZkDistributedQueue implements DistributedQueue {
* Return the current set of children from ZK; does not change internal state.
*/
TreeMap<String,byte[]> fetchZkChildren(Watcher watcher, Predicate<String> acceptFilter) throws KeeperException, InterruptedException {
-
+ if (log.isDebugEnabled()) log.debug("fetchZkChildren");
TreeMap<String,byte[]> orderedChildren = new TreeMap<>();
updateLock.lockInterruptibly();
try {
@@ -454,14 +471,14 @@ public class ZkDistributedQueue implements DistributedQueue {
if (!childName.startsWith(PREFIX)) {
// responses can be written to same queue with different naming scheme
- if (log.isDebugEnabled()) log.debug("Found child node with improper name: {}, prefix={}", childName, PREFIX);
+ if (log.isDebugEnabled()) log.debug("Filtering child out by prefix name=: {}, prefix={}", childName, PREFIX);
continue;
}
if (acceptFilter != null && acceptFilter.test(childName)) {
if (log.isDebugEnabled()) log.debug("Found child that matched exclude filter: {}", dir + "/" + childName);
continue;
}
-
+ if (log.isDebugEnabled()) log.debug("found: {}", childName);
orderedChildren.put(childName, entry.getValue());
}
}
@@ -472,31 +489,33 @@ public class ZkDistributedQueue implements DistributedQueue {
}
if (!orderedChildren.isEmpty()) {
+ if (log.isDebugEnabled()) log.debug("found children from fetch {}", orderedChildren.size());
return orderedChildren;
}
-
+ if (log.isDebugEnabled()) log.debug("found no children to fetch");
TreeMap<String,byte[]> remoteKnownChildren = new TreeMap<>();
try {
List<String> childNames = zookeeper.getChildren(dir, watcher, true);
stats.setQueueLength(childNames.size());
for (String childName : childNames) {
- if (log.isDebugEnabled()) log.debug("Examine child: {} out of {} {}", childName, childNames.size(), acceptFilter);
- remoteKnownChildren.put(childName, null);
+ if (log.isDebugEnabled()) log.debug("Examine child: {} out of children={} acceptFilter={}", childName, childNames.size(), acceptFilter);
// Check format
- if (!childName.regionMatches(0, PREFIX, 0, PREFIX.length())) {
-
+ if (!childName.startsWith(PREFIX)) {
// responses can be written to same queue with different naming scheme
- if (log.isDebugEnabled()) log.debug("Found child node with improper name: {}", childName);
+ if (log.isDebugEnabled()) log.debug("Excluding child by prefix: {}", childName);
continue;
}
+ remoteKnownChildren.put(childName, null);
if (acceptFilter != null && acceptFilter.test(childName)) {
- if (log.isDebugEnabled()) log.debug("Found child that matched exclude filter: {}", dir + "/" + childName);
+ if (log.isDebugEnabled()) log.debug("Found child that matched exclude filter: {}", childName);
continue;
}
if (log.isDebugEnabled()) log.debug("Add child to fetched children: {}", childName);
orderedChildren.put(childName, null);
}
+ if (log.isDebugEnabled()) log.debug("found {} remote children", remoteKnownChildren.size());
updateKnownChildren(remoteKnownChildren);
+ if (log.isDebugEnabled()) log.debug("returning {} matched children", orderedChildren.size());
return orderedChildren;
} catch (InterruptedException e) {
ParWork.propagateInterrupt(e);
@@ -505,20 +524,20 @@ public class ZkDistributedQueue implements DistributedQueue {
}
private void updateKnownChildren(TreeMap<String,byte[]> children) {
+ if (log.isDebugEnabled()) log.debug("Update known children size={}", children.size());
+ TreeMap<String,byte[]> newKnownChildren = new TreeMap<>();
updateLock.lock();
try {
Set<Map.Entry<String,byte[]>> entrySet = children.entrySet();
for (Map.Entry<String,byte[]> entry : entrySet) {
String childName = entry.getKey();
- byte[] data = knownChildren == null ? null : knownChildren.get(childName);
- if (data != null) {
- if (childName.startsWith("/")) {
- throw new IllegalArgumentException(childName);
- }
- children.put(childName, data);
+ byte[] data = entry.getValue();
+ if (data == null) {
+ data = knownChildren == null ? null : knownChildren.get(childName);
}
+ newKnownChildren.put(childName, data);
}
- knownChildren = children;
+ knownChildren = newKnownChildren;
} catch (Exception e) {
log.error("", e);
} finally {
@@ -542,12 +561,14 @@ public class ZkDistributedQueue implements DistributedQueue {
TimeOut timeout = new TimeOut(waitNanos, TimeUnit.NANOSECONDS, TimeSource.NANO_TIME);
try {
if (foundChildren.size() == 0) {
- waitForChildren(acceptFilter, foundChildren, waitNanos, timeout, watcher);
+ if (log.isDebugEnabled()) log.debug("found no children, watch for them excludeFilter={}", acceptFilter);
+ waitForChildren(acceptFilter, foundChildren, timeout, watcher);
}
// Technically we could restart the method if we fasil to actually obtain any valid children
// from ZK, but this is a super rare case, and the latency of the ZK fetches would require
// much more sophisticated waitNanos tracking.
+ if (log.isDebugEnabled()) log.debug("found children to process {}", foundChildren.size());
result = Collections.synchronizedList(new ArrayList<>(foundChildren.size()));
Set<String> dataPaths = new HashSet<>();
for (Map.Entry<String,byte[]> child : foundChildren.entrySet()) {
@@ -555,13 +576,15 @@ public class ZkDistributedQueue implements DistributedQueue {
break;
}
- byte[] data;
- updateLock.lockInterruptibly();
- try {
- data = knownChildren.get(child.getKey());
- } finally {
- if (updateLock.isHeldByCurrentThread()) {
- updateLock.unlock();
+ byte[] data = child.getValue();
+ if (data == null) {
+ updateLock.lockInterruptibly();
+ try {
+ data = knownChildren.get(child.getKey());
+ } finally {
+ if (updateLock.isHeldByCurrentThread()) {
+ updateLock.unlock();
+ }
}
}
@@ -570,20 +593,20 @@ public class ZkDistributedQueue implements DistributedQueue {
dataPaths.add(dir + "/" + child.getKey());
if (log.isDebugEnabled()) log.debug("get data for child={}", child.getKey());
} else {
+ if (log.isDebugEnabled()) log.debug("found data locally already {}", child.getKey());
result.add(new Pair<>(child.getKey(), data));
}
}
+ if (log.isDebugEnabled()) log.debug("fetch data for paths {}", dataPaths);
Map<String,byte[]> dataMap = zookeeper.getData(dataPaths);
updateLock.lockInterruptibly();
+ List<Pair<String,byte[]>> finalResult = result;
try {
- List<Pair<String,byte[]>> finalResult = result;
dataMap.forEach((k, bytes) -> {
finalResult.add(new Pair<>(k, bytes));
if (bytes != null) {
knownChildren.put(new File(k).getName(), bytes);
- } else {
- knownChildren.remove(new File(k).getName());
}
});
} finally {
@@ -591,79 +614,77 @@ public class ZkDistributedQueue implements DistributedQueue {
updateLock.unlock();
}
}
-
- return new ArrayList<>(result);
+ if (log.isDebugEnabled()) log.debug("peek elements returning {} nodes", finalResult.size());
+ return new ArrayList<>(finalResult);
} catch (InterruptedException e) {
ParWork.propagateInterrupt(e);
throw new AlreadyClosedException(e);
}
}
- private void waitForChildren(Predicate<String> acceptFilter, TreeMap<String,byte[]> foundChildren, long waitNanos, TimeOut timeout, ChildWatcher watcher) throws InterruptedException, KeeperException {
- if (log.isDebugEnabled()) log.debug("wait for children ... {}", waitNanos);
+ private void waitForChildren(Predicate<String> acceptFilter, TreeMap<String,byte[]> foundChildren, TimeOut timeout, ChildWatcher watcher) throws InterruptedException, KeeperException {
+ if (log.isDebugEnabled()) log.debug("wait for children ... {}ms", timeout.getInterval(TimeUnit.MILLISECONDS));
+
+ updateLock.lockInterruptibly();
try {
+ for (Map.Entry<String,byte[]> child : knownChildren.entrySet()) {
+ if (!child.getKey().startsWith(PREFIX) && (acceptFilter == null || !acceptFilter.test(child.getKey()))) {
+ foundChildren.put(child.getKey(), child.getValue());
+ }
+ }
+ } finally {
+ if (updateLock.isHeldByCurrentThread()) {
+ updateLock.unlock();
+ }
+ }
+ if (!foundChildren.isEmpty()) {
+ if (log.isDebugEnabled()) log.debug("Found new children ... {}", foundChildren.size());
+ return;
+ }
+ if (timeout.hasTimedOut()) {
+ if (log.isDebugEnabled()) log.debug("0 wait time and no children found, return");
+ return;
+ }
+ TreeMap<String,byte[]> fc = null;
+ while (fc == null || fc.isEmpty()) {
+ fc = fetchZkChildren(watcher, acceptFilter);
+ if (!fc.isEmpty()) {
+ foundChildren.putAll(fc);
+ return;
+ }
updateLock.lockInterruptibly();
try {
+ try {
+ changed.await(Math.min(timeout.getInterval(TimeUnit.MILLISECONDS), 5000), TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ ParWork.propagateInterrupt(e);
+ }
+ if (zookeeper.isClosed() || !zookeeper.isConnected()) {
+ throw new AlreadyClosedException();
+ }
+ if (timeout.hasTimedOut()) {
+ return;
+ }
for (Map.Entry<String,byte[]> child : knownChildren.entrySet()) {
if (acceptFilter == null || !acceptFilter.test(child.getKey())) {
foundChildren.put(child.getKey(), child.getValue());
}
}
+ if (!foundChildren.isEmpty()) {
+ try {
+ if (log.isDebugEnabled()) log.debug("Remove watches for {}");
+ zookeeper.getSolrZooKeeper().removeAllWatches(dir, Watcher.WatcherType.Children, false);
+ } catch (Exception e) {
+ log.info(e.getMessage());
+ }
+ return;
+ }
+
} finally {
if (updateLock.isHeldByCurrentThread()) {
updateLock.unlock();
}
}
- if (!foundChildren.isEmpty()) {
- return;
- }
- if (waitNanos <= 0) {
- if (log.isDebugEnabled()) log.debug("0 wait time and no children found, return");
- return;
- }
- TreeMap<String,byte[]> fc = null;
- while (fc == null || fc.isEmpty()) {
- if (watcher.fired) {
- watcher.fired = false;
- fc = fetchZkChildren(watcher, acceptFilter);
- if (!fc.isEmpty()) {
- foundChildren.putAll(fc);
- return;
- }
- }
- updateLock.lockInterruptibly();
- try {
- try {
- changed.await(10, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- ParWork.propagateInterrupt(e);
- }
- if (zookeeper.isClosed() || !zookeeper.isConnected()) {
- throw new AlreadyClosedException();
- }
- if (timeout.hasTimedOut()) {
- return;
- }
- for (Map.Entry<String,byte[]> child : knownChildren.entrySet()) {
- if (acceptFilter == null || !acceptFilter.test(child.getKey())) {
- foundChildren.put(child.getKey(), child.getValue());
- }
- }
- if (!foundChildren.isEmpty()) {
- return;
- }
- } finally {
- if (updateLock.isHeldByCurrentThread()) {
- updateLock.unlock();
- }
- }
- }
- } finally {
- try {
- zookeeper.getSolrZooKeeper().removeAllWatches(dir, Watcher.WatcherType.Children, false);
- } catch (Exception e) {
- log.info(e.getMessage());
- }
}
}
@@ -672,10 +693,10 @@ public class ZkDistributedQueue implements DistributedQueue {
*
* @return the data at the head of the queue.
*/
- private byte[] firstElement() throws KeeperException {
+ private byte[] firstElement(Predicate<String> acceptFilter) throws KeeperException {
try {
- Map.Entry<String,byte[]> firstChild = firstChild(false);
+ Map.Entry<String,byte[]> firstChild = firstChild(false, acceptFilter);
if (firstChild == null) {
return null;
}
@@ -724,9 +745,9 @@ public class ZkDistributedQueue implements DistributedQueue {
}
}
- private byte[] removeFirst() throws KeeperException {
+ private byte[] removeFirst(Predicate<String> acceptFilter) throws KeeperException {
try {
- Map.Entry<String,byte[]> firstChild = firstChild(true);
+ Map.Entry<String,byte[]> firstChild = firstChild(true, acceptFilter);
if (firstChild == null) {
return null;
}
@@ -734,10 +755,7 @@ public class ZkDistributedQueue implements DistributedQueue {
byte[] data;
updateLock.lockInterruptibly();
try {
- data = knownChildren.get(firstChild.getKey());
- if (data != null) {
- return data;
- }
+ data = knownChildren.remove(firstChild.getKey());
} finally {
if (updateLock.isHeldByCurrentThread()) {
updateLock.unlock();
@@ -746,7 +764,9 @@ public class ZkDistributedQueue implements DistributedQueue {
try {
String path = dir + "/" + firstChild.getKey();
- byte[] result = zookeeper.getData(path, null, null, true);
+ if (data == null) {
+ data = zookeeper.getData(path, null, null, true);
+ }
zookeeper.delete(path, -1, true);
updateLock.lockInterruptibly();
try {
@@ -757,7 +777,7 @@ public class ZkDistributedQueue implements DistributedQueue {
updateLock.unlock();
}
}
- return result;
+ return data;
} catch (KeeperException.NoNodeException e) {
return null;
}
@@ -772,7 +792,6 @@ public class ZkDistributedQueue implements DistributedQueue {
}
@VisibleForTesting class ChildWatcher implements Watcher {
- volatile boolean fired = false;
@Override
public void process(WatchedEvent event) {
@@ -787,13 +806,14 @@ public class ZkDistributedQueue implements DistributedQueue {
updateLock.lock();
try {
fetchZkChildren(null, null);
- fired = true;
changed.signalAll();
} catch (KeeperException | InterruptedException e) {
log.error("", e);
} finally {
updateLock.unlock();
}
+ } else {
+
}
}
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java
index 77f05f8..aa9bc6c 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java
@@ -25,6 +25,8 @@ import java.util.Locale;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.overseer.OverseerAction;
@@ -109,7 +111,7 @@ public class DeleteShardCmd implements OverseerCollectionMessageHandler.Cmd {
try {
List<ZkNodeProps> replicas = getReplicasForSlice(collectionName, slice);
- CountDownLatch cleanupLatch = new CountDownLatch(replicas.size());
+
for (ZkNodeProps r : replicas) {
final ZkNodeProps replica = r.plus(message.getProperties()).plus("parallel", "true").plus(ASYNC, asyncId);
if (log.isInfoEnabled()) {
@@ -119,7 +121,7 @@ public class DeleteShardCmd implements OverseerCollectionMessageHandler.Cmd {
NamedList deleteResult = new NamedList();
try {
((DeleteReplicaCmd)ocmh.commandMap.get(DELETEREPLICA)).deleteReplica(clusterState, replica, deleteResult, () -> {
- cleanupLatch.countDown();
+
if (deleteResult.get("failure") != null) {
synchronized (results) {
results.add("failure", String.format(Locale.ROOT, "Failed to delete replica for collection=%s shard=%s" +
@@ -136,24 +138,37 @@ public class DeleteShardCmd implements OverseerCollectionMessageHandler.Cmd {
});
} catch (KeeperException e) {
log.warn("Error deleting replica: {}", r, e);
- cleanupLatch.countDown();
throw e;
} catch (Exception e) {
ParWork.propagateInterrupt(e);
log.warn("Error deleting replica: {}", r, e);
- cleanupLatch.countDown();
throw e;
}
}
log.debug("Waiting for delete shard action to complete");
- cleanupLatch.await(1, TimeUnit.MINUTES);
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, DELETESHARD.toLower(), ZkStateReader.COLLECTION_PROP,
collectionName, ZkStateReader.SHARD_ID_PROP, sliceId);
ZkStateReader zkStateReader = ocmh.zkStateReader;
ocmh.overseer.offerStateUpdate(Utils.toJSON(m));
- zkStateReader.waitForState(collectionName, 45, TimeUnit.SECONDS, (c) -> c.getSlice(sliceId) == null);
+
+ AddReplicaCmd.Response response = new AddReplicaCmd.Response();
+
+ if (results.get("failure") == null && results.get("exception") == null) {
+ response.asyncFinalRunner = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ zkStateReader.waitForState(collectionName, 5, TimeUnit.SECONDS, (c) -> c != null && c.getSlice(sliceId) == null);
+ } catch (InterruptedException e) {
+ log.warn("", e);
+ } catch (TimeoutException e) {
+ log.warn("", e);
+ }
+ }
+ };
+ }
log.info("Successfully deleted collection: {} , shard: {}", collectionName, sliceId);
} catch (SolrException e) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
index 988ccea..701ea53 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
@@ -132,13 +132,6 @@ public class ZkStateWriter {
reader.getZkClient().clean(path);
- TimeOut timeout = new TimeOut(10, TimeUnit.SECONDS, TimeSource.NANO_TIME);
- timeout.waitFor("", () -> {
- DocCollection rc = reader.getClusterState().getCollectionOrNull(name);
- if (rc == null) return true;
- return false;
- });
-
LinkedHashMap<String,ClusterState.CollectionRef> collStates = new LinkedHashMap<>(prevState.getCollectionStates());
collStates.remove(name);
prevState = new ClusterState(prevState.getLiveNodes(), collStates, prevState.getZNodeVersion());
diff --git a/solr/core/src/java/org/apache/solr/core/SolrCore.java b/solr/core/src/java/org/apache/solr/core/SolrCore.java
index fcd5ab8..d694b2d 100644
--- a/solr/core/src/java/org/apache/solr/core/SolrCore.java
+++ b/solr/core/src/java/org/apache/solr/core/SolrCore.java
@@ -3110,14 +3110,14 @@ public final class SolrCore implements SolrInfoBean, Closeable {
File dataDir = cd.getInstanceDir().resolve(cd.getDataDir()).toFile();
try {
FileUtils.deleteDirectory(dataDir);
- } catch (IOException e) {
+ } catch (Exception e) {
log.error("Failed to delete data dir for unloaded core: {} dir: {}", cd.getName(), dataDir.getAbsolutePath(), e);
}
}
if (deleteInstanceDir) {
try {
FileUtils.deleteDirectory(cd.getInstanceDir().toFile());
- } catch (IOException e) {
+ } catch (Exception e) {
log.error("Failed to delete instance dir for unloaded core: {} dir: {}", cd.getName(), cd.getInstanceDir(), e);
}
}
diff --git a/solr/core/src/test/org/apache/solr/cloud/DeleteShardTest.java b/solr/core/src/test/org/apache/solr/cloud/DeleteShardTest.java
index e0a6fe2..f8dcac0 100644
--- a/solr/core/src/test/org/apache/solr/cloud/DeleteShardTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/DeleteShardTest.java
@@ -77,14 +77,11 @@ public class DeleteShardTest extends SolrCloudTestCase {
// Can delete an INATIVE shard
CollectionAdminRequest.deleteShard(collection, "shard1").process(cluster.getSolrClient());
- waitForState("Expected 'shard1' to be removed", collection, (n, c) -> {
- return c.getSlice("shard1") == null;
- });
// Can delete a shard under construction
setSliceState(collection, "shard2", Slice.State.CONSTRUCTION);
- waitForState("Expected 'shard2' to be removed", collection, (n, c) -> {
+ waitForState("Expected 'shard2' to be under CONSTRUCTION", collection, (n, c) -> {
return c != null && c.getSlice("shard2") != null && c.getSlice("shard2").getState() == State.CONSTRUCTION;
});
diff --git a/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java b/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java
index 71daf62..8d4b5d9 100644
--- a/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java
@@ -58,38 +58,38 @@ public class DistributedQueueTest extends SolrTestCaseJ4 {
DistributedQueue dq = makeDistributedQueue(dqZNode);
// basic ops
- assertNull(dq.poll());
+ assertNull(dq.poll(null));
try {
- dq.remove();
+ dq.remove(null);
fail("NoSuchElementException expected");
} catch (NoSuchElementException expected) {
// expected
}
dq.offer(data);
- assertArrayEquals(dq.peek(100), data);
- assertArrayEquals(dq.remove(), data);
- assertNull(dq.poll());
+ assertArrayEquals(dq.peek(null, 100), data);
+ assertArrayEquals(dq.remove(null), data);
+ assertNull(dq.poll(null));
dq.offer(data);
- assertArrayEquals(dq.take(), data); // waits for data
- assertNull(dq.poll());
+ assertArrayEquals(dq.take(null), data); // waits for data
+ assertNull(dq.poll(null));
dq.offer(data);
- dq.peek(true); // wait until data is definitely there before calling remove
- assertArrayEquals(dq.remove(), data);
- assertNull(dq.poll());
+ dq.peek(null, true); // wait until data is definitely there before calling remove
+ assertArrayEquals(dq.remove(null), data);
+ assertNull(dq.poll(null));
// should block until the background thread makes the offer
(new QueueChangerThread(dq, 200)).start();
- assertNotNull(dq.peek(true));
- assertNotNull(dq.remove());
- assertNull(dq.poll());
+ assertNotNull(dq.peek(null, true));
+ assertNotNull(dq.remove(null));
+ assertNull(dq.poll(null));
// timeout scenario ... background thread won't offer until long after the peek times out
QueueChangerThread qct = new QueueChangerThread(dq, 500);
qct.start();
- assertNull(dq.peek(100));
+ assertNull(dq.peek(null, 100));
qct.join();
}
@@ -105,17 +105,17 @@ public class DistributedQueueTest extends SolrTestCaseJ4 {
producer2.offer(data);
producer.offer(data);
producer.offer(data);
- consumer.poll();
+ consumer.poll(null);
assertEquals(2, consumer.getZkStats().getQueueLength());
producer.offer(data);
producer2.offer(data);
- consumer.poll();
+ consumer.poll(null);
// DQ still have elements in their queue, so we should not fetch elements path from Zk
assertEquals(1, consumer.getZkStats().getQueueLength());
- consumer.poll();
- consumer.peek();
+ consumer.poll(null);
+ consumer.peek(null);
assertEquals(2, consumer.getZkStats().getQueueLength());
}
@@ -127,8 +127,8 @@ public class DistributedQueueTest extends SolrTestCaseJ4 {
ZkDistributedQueue dq = makeDistributedQueue(dqZNode);
- assertNull(dq.peek());
- Future<String> future = testExecutor.submit(() -> new String(dq.peek(true), UTF8));
+ assertNull(dq.peek(null));
+ Future<String> future = testExecutor.submit(() -> new String(dq.peek(null, true), UTF8));
try {
future.get(1000, TimeUnit.MILLISECONDS);
fail("TimeoutException expected");
@@ -139,10 +139,10 @@ public class DistributedQueueTest extends SolrTestCaseJ4 {
// Ultimately trips the watcher, triggering child refresh
dq.offer(testData.getBytes(UTF8));
assertEquals(testData, future.get(1000, TimeUnit.MILLISECONDS));
- assertNotNull(dq.poll());
+ assertNotNull(dq.poll(null));
// After draining the queue, a watcher should be set.
- assertNull(dq.peek(100));
+ assertNull(dq.peek(null, 100));
// assertFalse(dq.isDirty());
@@ -153,7 +153,7 @@ public class DistributedQueueTest extends SolrTestCaseJ4 {
assertEquals(0, dq.watcherCount());
// Rerun the earlier test make sure updates are still seen, post reconnection.
- future = testExecutor.submit(() -> new String(dq.peek(true), UTF8));
+ future = testExecutor.submit(() -> new String(dq.peek(null, true), UTF8));
try {
future.get(1000, TimeUnit.MILLISECONDS);
fail("TimeoutException expected");
@@ -164,8 +164,8 @@ public class DistributedQueueTest extends SolrTestCaseJ4 {
// Ultimately trips the watcher, triggering child refresh
dq.offer(testData.getBytes(UTF8));
assertEquals(testData, future.get(1000, TimeUnit.MILLISECONDS));
- assertNotNull(dq.poll());
- assertNull(dq.poll());
+ assertNotNull(dq.poll(null));
+ assertNull(dq.poll(null));
}
@Test
@@ -176,21 +176,21 @@ public class DistributedQueueTest extends SolrTestCaseJ4 {
assertEquals(1, dq.watcherCount());
assertTrue(dq.peekElements(1, 1, s1 -> true).isEmpty());
assertEquals(1, dq.watcherCount());
- assertNull(dq.peek());
+ assertNull(dq.peek(null));
assertEquals(1, dq.watcherCount());
- assertNull(dq.peek(1));
+ assertNull(dq.peek(null, 1));
assertEquals(1, dq.watcherCount());
dq.offer("hello world".getBytes(UTF8));
- assertNotNull(dq.peek()); // synchronously available
+ assertNotNull(dq.peek(null)); // synchronously available
// dirty and watcher state indeterminate here, race with watcher
Thread.sleep(100); // watcher should have fired now
- assertNotNull(dq.peek());
+ assertNotNull(dq.peek(null));
// in case of race condition, childWatcher is kicked off after peek()
if (dq.watcherCount() == 0) {
- dq.poll();
+ dq.poll(null);
dq.offer("hello world".getBytes(UTF8));
- dq.peek();
+ dq.peek(null);
}
assertEquals(1, dq.watcherCount());
assertFalse(dq.peekElements(1, 1, s -> true).isEmpty());
@@ -205,8 +205,8 @@ public class DistributedQueueTest extends SolrTestCaseJ4 {
for (int i = 0; i < 100; i++) {
byte[] data = String.valueOf(i).getBytes(UTF8);
dq.offer(data);
- assertNotNull(dq.peek());
- dq.poll();
+ assertNotNull(dq.peek(null));
+ dq.poll(null);
dq.peekElements(1, 1, s -> true);
}
}
diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
index d52dc89..0f7e2e0 100644
--- a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
@@ -842,14 +842,14 @@ public class OverseerTest extends SolrTestCaseJ4 {
TimeOut timeOut = new TimeOut(10, TimeUnit.SECONDS, TimeSource.NANO_TIME);
while(!timeOut.hasTimedOut()) {
- if (q.peek() == null) {
+ if (q.peek(null) == null) {
break;
}
Thread.sleep(50);
}
- assertTrue(showQpeek(workQueue), workQueue.peek() == null);
- assertTrue(showQpeek(q), q.peek() == null);
+ assertTrue(showQpeek(workQueue), workQueue.peek(null) == null);
+ assertTrue(showQpeek(q), q.peek(null) == null);
} finally {
close(overseerClient);
close(reader);
@@ -860,7 +860,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
if (q == null) {
return "";
}
- byte[] bytes = q.peek();
+ byte[] bytes = q.peek(null);
if (bytes == null) {
return "";
}
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/DistributedQueue.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/DistributedQueue.java
index 9a31bff..1a6aed9 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/DistributedQueue.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/DistributedQueue.java
@@ -26,17 +26,17 @@ import org.apache.solr.common.util.Pair;
* Distributed queue component. Methods largely follow those in {@link java.util.Queue}.
*/
public interface DistributedQueue {
- byte[] peek() throws Exception;
+ byte[] peek(Predicate<String> acceptFilter) throws Exception;
- byte[] peek(boolean block) throws Exception;
+ byte[] peek(Predicate<String> acceptFilter, boolean block) throws Exception;
- byte[] peek(long wait) throws Exception;
+ byte[] peek(Predicate<String> acceptFilter, long wait) throws Exception;
- byte[] poll() throws Exception;
+ byte[] poll(Predicate<String> acceptFilter) throws Exception;
- byte[] remove() throws Exception;
+ byte[] remove(Predicate<String> acceptFilter) throws Exception;
- byte[] take() throws Exception;
+ byte[] take(Predicate<String> acceptFilter) throws Exception;
void offer(byte[] data) throws Exception;
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
index 5e59af3..6754d70 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java
@@ -17,6 +17,7 @@
package org.apache.solr.client.solrj.request;
import java.io.IOException;
+import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -53,6 +54,8 @@ import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.ShardParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.NamedList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
import static org.apache.solr.common.cloud.ZkStateReader.NRT_REPLICAS;
@@ -75,7 +78,7 @@ import static org.apache.solr.common.params.CollectionAdminParams.WITH_COLLECTIO
* @since solr 4.5
*/
public abstract class CollectionAdminRequest<T extends CollectionAdminResponse> extends SolrRequest<T> implements V2RequestSupport, MapWriter {
-
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
/**
* The set of modifiable collection properties
*/
@@ -452,12 +455,12 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
private Create(String collection, String config, String routerName, Integer numShards, String shards, Integer numNrtReplicas, Integer numTlogReplicas, Integer numPullReplicas) {
super(CollectionAction.CREATE, SolrIdentifierValidator.validateCollectionName(collection));
// NOTE: there's very little we can assert about the args because nothing but "collection" is required by the server
- if ((null != shards) && (null != numShards && numShards != 0)) {
+ if ((null != shards) && (null != numShards)) {
throw new IllegalArgumentException("Can not specify both a numShards and a list of shards");
}
this.configName = config;
this.routerName = routerName;
- this.numShards = numShards;;
+ this.numShards = numShards;
this.setShards(shards);
this.nrtReplicas = numNrtReplicas;
this.tlogReplicas = numTlogReplicas;
@@ -482,6 +485,7 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
}
public int getTotaleReplicaCount() {
+ log.info("nrtReplicas={} tlogReplicas={} pullReplicas={}", nrtReplicas, tlogReplicas, pullReplicas);
int cnt = (nrtReplicas == null ? 0 : nrtReplicas) + (tlogReplicas == null ? 0 : tlogReplicas) + (pullReplicas == null ? 0 : pullReplicas);
return cnt;
}
diff --git a/solr/solrj/src/java/org/apache/solr/common/util/TimeOut.java b/solr/solrj/src/java/org/apache/solr/common/util/TimeOut.java
index 2256771..130e94d 100644
--- a/solr/solrj/src/java/org/apache/solr/common/util/TimeOut.java
+++ b/solr/solrj/src/java/org/apache/solr/common/util/TimeOut.java
@@ -26,11 +26,13 @@ public class TimeOut {
private final long timeoutAt, startTime;
private final TimeSource timeSource;
+ private final long interval;
public TimeOut(long interval, TimeUnit unit, TimeSource timeSource) {
this.timeSource = timeSource;
startTime = timeSource.getTimeNs();
- this.timeoutAt = startTime + NANOSECONDS.convert(interval, unit);
+ this.interval = NANOSECONDS.convert(interval, unit);
+ this.timeoutAt = startTime + interval;
}
public boolean hasTimedOut() {
@@ -41,6 +43,10 @@ public class TimeOut {
timeSource.sleep(ms);
}
+ public long getInterval(TimeUnit unit) {
+ return unit.convert(interval, NANOSECONDS);
+ }
+
public long timeLeft(TimeUnit unit) {
return unit.convert(timeoutAt - timeSource.getTimeNs(), NANOSECONDS);
}
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java b/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java
index c4ea258..0b51b90 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java
@@ -926,7 +926,7 @@ public class MiniSolrCloudCluster {
}
public void waitForActiveCollection(String collection, long wait, TimeUnit unit, int shards, int totalReplicas, boolean exact) {
- log.info("waitForActiveCollection: {}", collection);
+ log.info("waitForActiveCollection: {} shards={} replicas={}, exact={}", collection, shards, totalReplicas, exact);
CollectionStatePredicate predicate = BaseCloudSolrClient.expectedShardsAndActiveReplicas(shards, totalReplicas, exact);
AtomicReference<DocCollection> state = new AtomicReference<>();