You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by dr...@apache.org on 2016/06/09 20:05:54 UTC
lucene-solr:branch_5_5: SOLR-9191: OverseerTaskQueue.peekTopN()
fatally flawed
Repository: lucene-solr
Updated Branches:
refs/heads/branch_5_5 cb42e6d63 -> 9f5fae7ed
SOLR-9191: OverseerTaskQueue.peekTopN() fatally flawed
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/9f5fae7e
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/9f5fae7e
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/9f5fae7e
Branch: refs/heads/branch_5_5
Commit: 9f5fae7ed82cd565d991ed92f9af4ca23eb7bac2
Parents: cb42e6d
Author: Scott Blum <dr...@gmail.com>
Authored: Tue Jun 7 01:52:16 2016 -0400
Committer: Scott Blum <dr...@apache.org>
Committed: Thu Jun 9 16:05:48 2016 -0400
----------------------------------------------------------------------
solr/CHANGES.txt | 2 +
.../org/apache/solr/cloud/DistributedQueue.java | 96 ++++++++++++--------
.../solr/cloud/OverseerTaskProcessor.java | 6 +-
.../apache/solr/cloud/OverseerTaskQueue.java | 56 +++++-------
.../apache/solr/cloud/DistributedQueueTest.java | 52 ++++++++++-
5 files changed, 138 insertions(+), 74 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9f5fae7e/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index c0211de..27cfd02 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -14,6 +14,8 @@ Bug Fixes
* SOLR-9198: config APIs unable to add multiple values with same name (noble)
+* SOLR-9191: OverseerTaskQueue.peekTopN() fatally flawed (Scott Blum, Noble Paul)
+
======================= 5.5.1 =======================
Bug Fixes
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9f5fae7e/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java b/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java
index d3bf2e4..7576ae5 100644
--- a/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java
+++ b/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java
@@ -17,10 +17,10 @@
package org.apache.solr.cloud;
import java.lang.invoke.MethodHandles;
-import java.util.Collections;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.NoSuchElementException;
-import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
@@ -28,11 +28,12 @@ import java.util.concurrent.locks.ReentrantLock;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableSortedSet;
+import com.google.common.base.Predicate;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkCmdExecutor;
+import org.apache.solr.common.util.Pair;
import org.apache.solr.util.stats.TimerContext;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
@@ -81,21 +82,15 @@ public class DistributedQueue {
private TreeSet<String> knownChildren = new TreeSet<>();
/**
- * Used to wait on a non-empty queue; you must hold {@link #updateLock} and verify that
- * {@link #knownChildren} is empty before waiting on this condition.
+ * Used to wait on ZK changes to the child list; you must hold {@link #updateLock} before waiting on this condition.
*/
- private final Condition notEmpty = updateLock.newCondition();
+ private final Condition changed = updateLock.newCondition();
/**
- * If non-null, the last watcher to listen for child changes.
+ * If non-null, the last watcher to listen for child changes. If null, the in-memory contents are dirty.
*/
private ChildWatcher lastWatcher = null;
- /**
- * If true, ZK's child list probably doesn't match what's in memory.
- */
- private boolean isDirty = true;
-
public DistributedQueue(SolrZkClient zookeeper, String dir) {
this(zookeeper, dir, new Overseer.Stats());
}
@@ -166,7 +161,7 @@ public class DistributedQueue {
if (result != null) {
return result;
}
- waitNanos = notEmpty.awaitNanos(waitNanos);
+ waitNanos = changed.awaitNanos(waitNanos);
}
return null;
} finally {
@@ -223,7 +218,7 @@ public class DistributedQueue {
if (result != null) {
return result;
}
- notEmpty.await();
+ changed.await();
}
} finally {
updateLock.unlock();
@@ -274,25 +269,19 @@ public class DistributedQueue {
private String firstChild(boolean remove) throws KeeperException, InterruptedException {
updateLock.lockInterruptibly();
try {
- // Try to fetch the first in-memory child.
- if (!knownChildren.isEmpty()) {
+ // If we're not in a dirty state, and we have in-memory children, return from in-memory.
+ if (lastWatcher != null && !knownChildren.isEmpty()) {
return remove ? knownChildren.pollFirst() : knownChildren.first();
}
- if (lastWatcher != null && !isDirty) {
- // No children, no known updates, and a watcher is already set; nothing we can do.
- return null;
- }
-
// Try to fetch an updated list of children from ZK.
ChildWatcher newWatcher = new ChildWatcher();
knownChildren = fetchZkChildren(newWatcher);
lastWatcher = newWatcher; // only set after fetchZkChildren returns successfully
- isDirty = false;
if (knownChildren.isEmpty()) {
return null;
}
- notEmpty.signalAll();
+ changed.signalAll();
return remove ? knownChildren.pollFirst() : knownChildren.first();
} finally {
updateLock.unlock();
@@ -326,26 +315,63 @@ public class DistributedQueue {
}
/**
- * Return the currently-known set of children from memory. If there are no children,
- * waits up to {@code waitMillis} for at least one child to become available. May
- * update the set of known children.
+ * Return the currently-known set of elements, using child names from memory. If no children are found, or no
+ * children pass {@code acceptFilter}, waits up to {@code waitMillis} for at least one child to become available.
+ * <p/>
+ * Package-private to support {@link OverseerTaskQueue} specifically.
*/
- SortedSet<String> getChildren(long waitMillis) throws KeeperException, InterruptedException {
+ Collection<Pair<String, byte[]>> peekElements(int max, long waitMillis, Predicate<String> acceptFilter) throws KeeperException, InterruptedException {
+ List<String> foundChildren = new ArrayList<>();
long waitNanos = TimeUnit.MILLISECONDS.toNanos(waitMillis);
- while (waitNanos > 0) {
+ while (true) {
// Trigger a fetch if needed.
- firstElement();
+ firstChild(false);
+
updateLock.lockInterruptibly();
try {
- if (!knownChildren.isEmpty()) {
- return new TreeSet<>(knownChildren);
+ for (String child : knownChildren) {
+ if (acceptFilter.apply(child)) {
+ foundChildren.add(child);
+ }
}
- waitNanos = notEmpty.awaitNanos(waitNanos);
+ if (!foundChildren.isEmpty()) {
+ break;
+ }
+ if (waitNanos <= 0) {
+ break;
+ }
+ waitNanos = changed.awaitNanos(waitNanos);
} finally {
updateLock.unlock();
}
+
+ if (!foundChildren.isEmpty()) {
+ break;
+ }
+ }
+
+ // Technically we could restart the method if we fail to actually obtain any valid children
+ // from ZK, but this is a super rare case, and the latency of the ZK fetches would require
+ // much more sophisticated waitNanos tracking.
+ List<Pair<String, byte[]>> result = new ArrayList<>();
+ for (String child : foundChildren) {
+ if (result.size() >= max) {
+ break;
+ }
+ try {
+ byte[] data = zookeeper.getData(dir + "/" + child, null, null, true);
+ result.add(new Pair<>(child, data));
+ } catch (KeeperException.NoNodeException e) {
+ // Another client deleted the node first, remove the in-memory and continue.
+ updateLock.lockInterruptibly();
+ try {
+ knownChildren.remove(child);
+ } finally {
+ updateLock.unlock();
+ }
+ }
}
- return ImmutableSortedSet.of();
+ return result;
}
/**
@@ -419,10 +445,8 @@ public class DistributedQueue {
if (lastWatcher == this) {
lastWatcher = null;
}
- // Do no updates in this thread, just signal state back to client threads.
- isDirty = true;
// optimistically signal any waiters that the queue may not be empty now, so they can wake up and retry
- notEmpty.signalAll();
+ changed.signalAll();
} finally {
updateLock.unlock();
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9f5fae7e/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
index 26a90cb..20f0e88 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
@@ -198,9 +198,9 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
cleanUpWorkQueue();
List<QueueEvent> heads = workQueue.peekTopN(MAX_PARALLEL_TASKS, runningZKTasks, 2000L);
-
- if (heads == null)
+ if (heads.isEmpty()) {
continue;
+ }
log.debug("Got {} tasks from work-queue : [{}]", heads.size(), heads.toString());
@@ -495,8 +495,8 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
}
}
-
messageHandler.unmarkExclusiveTask(taskKey, operation, message);
+ workQueue.remove(head);
}
private void resetTaskWithException(OverseerMessageHandler messageHandler, String id, String asyncId, String taskKey, ZkNodeProps message) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9f5fae7e/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
index ea219d3..d1a85ac 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
@@ -17,14 +17,15 @@
package org.apache.solr.cloud;
import java.lang.invoke.MethodHandles;
-
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
+import com.google.common.base.Predicate;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.util.Pair;
import org.apache.solr.util.stats.TimerContext;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
@@ -61,7 +62,7 @@ public class OverseerTaskQueue extends DistributedQueue {
List<String> childNames = zookeeper.getChildren(dir, null, true);
stats.setQueueLength(childNames.size());
for (String childName : childNames) {
- if (childName != null) {
+ if (childName != null && childName.startsWith(PREFIX)) {
try {
byte[] data = zookeeper.getData(dir + "/" + childName, null, null, true);
if (data != null) {
@@ -82,9 +83,8 @@ public class OverseerTaskQueue extends DistributedQueue {
/**
* Remove the event and save the response into the other path.
- *
*/
- public byte[] remove(QueueEvent event) throws KeeperException,
+ public void remove(QueueEvent event) throws KeeperException,
InterruptedException {
TimerContext time = stats.time(dir + "_remove_event");
try {
@@ -97,9 +97,10 @@ public class OverseerTaskQueue extends DistributedQueue {
LOG.info("Response ZK path: " + responsePath + " doesn't exist."
+ " Requestor may have disconnected from ZooKeeper");
}
- byte[] data = zookeeper.getData(path, null, null, true);
- zookeeper.delete(path, -1, true);
- return data;
+ try {
+ zookeeper.delete(path, -1, true);
+ } catch (KeeperException.NoNodeException ignored) {
+ }
} finally {
time.stop();
}
@@ -211,49 +212,36 @@ public class OverseerTaskQueue extends DistributedQueue {
}
}
- public List<QueueEvent> peekTopN(int n, Set<String> excludeSet, long waitMillis)
+ public List<QueueEvent> peekTopN(int n, final Set<String> excludeSet, long waitMillis)
throws KeeperException, InterruptedException {
ArrayList<QueueEvent> topN = new ArrayList<>();
LOG.debug("Peeking for top {} elements. ExcludeSet: {}", n, excludeSet);
- TimerContext time = null;
+ TimerContext time;
if (waitMillis == Long.MAX_VALUE) time = stats.time(dir + "_peekTopN_wait_forever");
else time = stats.time(dir + "_peekTopN_wait" + waitMillis);
try {
- for (String headNode : getChildren(waitMillis)) {
- if (topN.size() < n) {
- try {
- String id = dir + "/" + headNode;
- if (excludeSet.contains(id)) continue;
- QueueEvent queueEvent = new QueueEvent(id,
- zookeeper.getData(dir + "/" + headNode, null, null, true), null);
- topN.add(queueEvent);
- } catch (KeeperException.NoNodeException e) {
- // Another client removed the node first, try next
- }
- } else {
- if (topN.size() >= 1) {
- printQueueEventsListElementIds(topN);
- return topN;
- }
+ for (Pair<String, byte[]> element : peekElements(n, waitMillis, new Predicate<String>() {
+ @Override
+ public boolean apply(String child) {
+ return !excludeSet.contains(dir + "/" + child);
}
+ })) {
+ topN.add(new QueueEvent(dir + "/" + element.getKey(),
+ element.getValue(), null));
}
-
- if (topN.size() > 0 ) {
- printQueueEventsListElementIds(topN);
- return topN;
- }
- return null;
+ printQueueEventsListElementIds(topN);
+ return topN;
} finally {
time.stop();
}
}
private static void printQueueEventsListElementIds(ArrayList<QueueEvent> topN) {
- if(LOG.isDebugEnabled()) {
- StringBuffer sb = new StringBuffer("[");
- for(QueueEvent queueEvent: topN) {
+ if (LOG.isDebugEnabled() && !topN.isEmpty()) {
+ StringBuilder sb = new StringBuilder("[");
+ for (QueueEvent queueEvent : topN) {
sb.append(queueEvent.getId()).append(", ");
}
sb.append("]");
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9f5fae7e/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java b/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java
index 4c67879..88bd461 100644
--- a/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java
@@ -20,11 +20,12 @@ import java.nio.charset.Charset;
import java.util.NoSuchElementException;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.util.ExecutorUtil;
@@ -148,6 +149,55 @@ public class DistributedQueueTest extends SolrTestCaseJ4 {
assertNull(dq.poll());
}
+ @Test
+ public void testPeekElements() throws Exception {
+ String dqZNode = "/distqueue/test";
+ final byte[] data = "hello world".getBytes(UTF8);
+
+ final DistributedQueue dq = makeDistributedQueue(dqZNode);
+
+ // Populate with data.
+ dq.offer(data);
+ dq.offer(data);
+ dq.offer(data);
+
+ // Should be able to get 0, 1, 2, or 3 instantly
+ for (int i = 0; i <= 3; ++i) {
+ assertEquals(i, dq.peekElements(i, 0, Predicates.<String>alwaysTrue()).size());
+ }
+
+ // Asking for more should return only 3.
+ assertEquals(3, dq.peekElements(4, 0, Predicates.<String>alwaysTrue()).size());
+
+ // If we filter everything out, we should block for the full time.
+ long start = System.nanoTime();
+ assertEquals(0, dq.peekElements(4, 1000, Predicates.<String>alwaysFalse()).size());
+ assertTrue(System.nanoTime() - start >= TimeUnit.MILLISECONDS.toNanos(500));
+
+ // If someone adds a new matching element while we're waiting, we should return immediately.
+ executor.submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(500);
+ dq.offer(data);
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+ });
+ start = System.nanoTime();
+ assertEquals(1, dq.peekElements(4, 2000, new Predicate<String>() {
+ @Override
+ public boolean apply(String child) {
+ // The 4th element in the queue will end with a "3".
+ return child.endsWith("3");
+ }
+ }).size());
+ assertTrue(System.nanoTime() - start < TimeUnit.MILLISECONDS.toNanos(1000));
+ assertTrue(System.nanoTime() - start >= TimeUnit.MILLISECONDS.toNanos(250));
+ }
+
private void forceSessionExpire() throws InterruptedException, TimeoutException {
long sessionId = zkClient.getSolrZooKeeper().getSessionId();
zkServer.expire(sessionId);