You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by sh...@apache.org on 2015/08/20 16:05:37 UTC
svn commit: r1696789 - in /lucene/dev/branches/branch_5x: ./ solr/
solr/core/ solr/core/src/java/org/apache/solr/cloud/
solr/core/src/java/org/apache/solr/handler/admin/
solr/core/src/java/org/apache/solr/update/processor/
solr/core/src/test/org/apache...
Author: shalin
Date: Thu Aug 20 14:05:36 2015
New Revision: 1696789
URL: http://svn.apache.org/r1696789
Log:
SOLR-6760: New optimized DistributedQueue implementation for overseer
Added:
lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionQueue.java
- copied unchanged from r1696706, lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionQueue.java
lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionQueueTest.java
- copied unchanged from r1696706, lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionQueueTest.java
Modified:
lucene/dev/branches/branch_5x/ (props changed)
lucene/dev/branches/branch_5x/solr/ (props changed)
lucene/dev/branches/branch_5x/solr/CHANGES.txt (contents, props changed)
lucene/dev/branches/branch_5x/solr/core/ (props changed)
lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java
lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/Overseer.java
lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java
lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/OverseerProcessor.java
lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/ZkController.java
lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java
lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionProcessorTest.java
Modified: lucene/dev/branches/branch_5x/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/CHANGES.txt?rev=1696789&r1=1696788&r2=1696789&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/CHANGES.txt (original)
+++ lucene/dev/branches/branch_5x/solr/CHANGES.txt Thu Aug 20 14:05:36 2015
@@ -74,6 +74,10 @@ Optimizations
is anywhere from 20% to over 100% faster and produces less garbage on average.
(yonik)
+* SOLR-6760: New optimized DistributedQueue implementation for overseer increases
+ message processing performance by ~470%.
+ (Noble Paul, Scott Blum, shalin)
+
Other Changes
----------------------
Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java?rev=1696789&r1=1696788&r2=1696789&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java Thu Aug 20 14:05:36 2015
@@ -18,19 +18,22 @@
package org.apache.solr.cloud;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.Map;
import java.util.NoSuchElementException;
-import java.util.Set;
-import java.util.TreeMap;
-
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSortedSet;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkCmdExecutor;
-import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.util.stats.TimerContext;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
@@ -40,23 +43,60 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * A distributed queue from zk recipes.
+ * A distributed queue.
*/
public class DistributedQueue {
private static final Logger LOG = LoggerFactory.getLogger(DistributedQueue.class);
-
- private static long DEFAULT_TIMEOUT = 5*60*1000;
-
- private final String dir;
-
- private SolrZkClient zookeeper;
-
- private final String prefix = "qn-";
-
- private final String response_prefix = "qnr-" ;
- private final Overseer.Stats stats;
-
+ static final String PREFIX = "qn-";
+
+ /**
+ * Theory of operation:
+ * <p>
+ * Under ordinary circumstances we neither watch nor poll for children in ZK.
+ * Instead we keep an in-memory list of known child names. When the in-memory
+ * list is exhausted, we then fetch from ZK.
+ * <p>
+ * We only bother setting a child watcher when the queue has no children in ZK.
+ */
+ private static final Object _IMPLEMENTATION_NOTES = null;
+
+ final String dir;
+
+ final SolrZkClient zookeeper;
+
+ final Overseer.Stats stats;
+
+ /**
+ * A lock that guards all of the mutable state that follows.
+ */
+ private final ReentrantLock updateLock = new ReentrantLock();
+
+ /**
+ * Contains the last set of children fetched from ZK. Elements are removed from the head of
+ * this in-memory set as they are consumed from the queue. Due to the distributed nature
+ * of the queue, elements may appear in this set whose underlying nodes have been consumed in ZK.
+ * Therefore, methods like {@link #peek()} have to double-check actual node existence, and methods
+ * like {@link #poll()} must resolve any races by attempting to delete the underlying node.
+ */
+ private TreeSet<String> knownChildren = new TreeSet<>();
+
+ /**
+ * Used to wait on a non-empty queue; you must hold {@link #updateLock} and verify that
+ * {@link #knownChildren} is empty before waiting on this condition.
+ */
+ private final Condition notEmpty = updateLock.newCondition();
+
+ /**
+ * If non-null, the last watcher to listen for child changes.
+ */
+ private ChildWatcher lastWatcher = null;
+
+ /**
+ * If true, ZK's child list probably doesn't match what's in memory.
+ */
+ private boolean isDirty = true;
+
public DistributedQueue(SolrZkClient zookeeper, String dir) {
this(zookeeper, dir, new Overseer.Stats());
}
@@ -77,554 +117,312 @@ public class DistributedQueue {
this.zookeeper = zookeeper;
this.stats = stats;
}
-
- /**
- * Returns a Map of the children, ordered by id.
- *
- * @param watcher
- * optional watcher on getChildren() operation.
- * @return map from id to child name for all children
- */
- private TreeMap<Long,String> orderedChildren(Watcher watcher)
- throws KeeperException, InterruptedException {
- TreeMap<Long,String> orderedChildren = new TreeMap<>();
-
- List<String> childNames = zookeeper.getChildren(dir, watcher, true);
- stats.setQueueLength(childNames.size());
- for (String childName : childNames) {
- try {
- // Check format
- if (!childName.regionMatches(0, prefix, 0, prefix.length())) {
- LOG.debug("Found child node with improper name: " + childName);
- continue;
- }
- String suffix = childName.substring(prefix.length());
- Long childId = new Long(suffix);
- orderedChildren.put(childId, childName);
- } catch (NumberFormatException e) {
- LOG.warn("Found child node with improper format : " + childName + " "
- + e, e);
- }
- }
-
- return orderedChildren;
- }
-
/**
- * Returns true if the queue contains a task with the specified async id.
+ * Returns the data at the first element of the queue, or null if the queue is
+ * empty.
+ *
+ * @return data at the first element of the queue, or null.
*/
- public boolean containsTaskWithRequestId(String requestIdKey, String requestId)
- throws KeeperException, InterruptedException {
-
- List<String> childNames = zookeeper.getChildren(dir, null, true);
- stats.setQueueLength(childNames.size());
- for (String childName : childNames) {
- if (childName != null) {
- try {
- byte[] data = zookeeper.getData(dir + "/" + childName, null, null, true);
- if (data != null) {
- ZkNodeProps message = ZkNodeProps.load(data);
- if (message.containsKey(requestIdKey)) {
- LOG.debug(">>>> {}", message.get(requestIdKey));
- if(message.get(requestIdKey).equals(requestId)) return true;
- }
- }
- } catch (KeeperException.NoNodeException e) {
- // Another client removed the node first, try next
- }
- }
+ public byte[] peek() throws KeeperException, InterruptedException {
+ TimerContext time = stats.time(dir + "_peek");
+ try {
+ return firstElement();
+ } finally {
+ time.stop();
}
-
- return false;
}
-
/**
- * Return the head of the queue without modifying the queue.
- *
- * @return the data at the head of the queue.
+ * Returns the data at the first element of the queue, or null if the queue is
+ * empty and block is false.
+ *
+ * @param block if true, blocks until an element enters the queue
+ * @return data at the first element of the queue, or null.
*/
- private QueueEvent element() throws KeeperException,
- InterruptedException {
- TreeMap<Long,String> orderedChildren;
-
- // element, take, and remove follow the same pattern.
- // We want to return the child node with the smallest sequence number.
- // Since other clients are remove()ing and take()ing nodes concurrently,
- // the child with the smallest sequence number in orderedChildren might be
- // gone by the time we check.
- // We don't call getChildren again until we have tried the rest of the nodes
- // in sequence order.
- while (true) {
- try {
- orderedChildren = orderedChildren(null);
- } catch (KeeperException.NoNodeException e) {
- return null;
- }
- if (orderedChildren.size() == 0) return null;
-
- for (String headNode : orderedChildren.values()) {
- if (headNode != null) {
- try {
- return new QueueEvent(dir + "/" + headNode, zookeeper.getData(dir + "/" + headNode, null, null, true), null);
- } catch (KeeperException.NoNodeException e) {
- // Another client removed the node first, try next
- }
- }
- }
- }
+ public byte[] peek(boolean block) throws KeeperException, InterruptedException {
+ return block ? peek(Long.MAX_VALUE) : peek();
}
-
+
/**
- * Attempts to remove the head of the queue and return it.
- *
- * @return The former head of the queue
+ * Returns the data at the first element of the queue, or null if the queue is
+ * empty after wait ms.
+ *
+ * @param wait max wait time in ms.
+ * @return data at the first element of the queue, or null.
*/
- public byte[] remove() throws NoSuchElementException, KeeperException,
- InterruptedException {
- TreeMap<Long,String> orderedChildren;
- // Same as for element. Should refactor this.
- TimerContext time = stats.time(dir + "_remove");
+ public byte[] peek(long wait) throws KeeperException, InterruptedException {
+ Preconditions.checkArgument(wait > 0);
+ TimerContext time;
+ if (wait == Long.MAX_VALUE) {
+ time = stats.time(dir + "_peek_wait_forever");
+ } else {
+ time = stats.time(dir + "_peek_wait" + wait);
+ }
+ updateLock.lockInterruptibly();
try {
- while (true) {
- try {
- orderedChildren = orderedChildren(null);
- } catch (KeeperException.NoNodeException e) {
- throw new NoSuchElementException();
- }
- if (orderedChildren.size() == 0) throw new NoSuchElementException();
-
- for (String headNode : orderedChildren.values()) {
- String path = dir + "/" + headNode;
- try {
- byte[] data = zookeeper.getData(path, null, null, true);
- zookeeper.delete(path, -1, true);
- return data;
- } catch (KeeperException.NoNodeException e) {
- // Another client deleted the node first.
- }
+ long waitNanos = TimeUnit.MILLISECONDS.toNanos(wait);
+ while (waitNanos > 0) {
+ byte[] result = firstElement();
+ if (result != null) {
+ return result;
}
-
+ waitNanos = notEmpty.awaitNanos(waitNanos);
}
+ return null;
} finally {
+ updateLock.unlock();
time.stop();
}
}
-
+
/**
- * Remove the event and save the response into the other path.
- *
+ * Attempts to remove the head of the queue and return it. Returns null if the
+ * queue is empty.
+ *
+ * @return Head of the queue or null.
*/
- public byte[] remove(QueueEvent event) throws KeeperException,
- InterruptedException {
- TimerContext time = stats.time(dir + "_remove_event");
+ public byte[] poll() throws KeeperException, InterruptedException {
+ TimerContext time = stats.time(dir + "_poll");
try {
- String path = event.getId();
- String responsePath = dir + "/" + response_prefix
- + path.substring(path.lastIndexOf("-") + 1);
- if (zookeeper.exists(responsePath, true)) {
- zookeeper.setData(responsePath, event.getBytes(), true);
- }
- byte[] data = zookeeper.getData(path, null, null, true);
- zookeeper.delete(path, -1, true);
- return data;
+ return removeFirst();
} finally {
time.stop();
}
}
/**
- * Watcher that blocks until a WatchedEvent occurs for a znode.
+ * Attempts to remove the head of the queue and return it.
+ *
+ * @return The former head of the queue
*/
- private final class LatchWatcher implements Watcher {
-
- private final Object lock;
- private WatchedEvent event;
- private Event.EventType latchEventType;
-
- LatchWatcher(Object lock) {
- this(lock, null);
- }
-
- LatchWatcher(Event.EventType eventType) {
- this(new Object(), eventType);
- }
-
- LatchWatcher(Object lock, Event.EventType eventType) {
- this.lock = lock;
- this.latchEventType = eventType;
- }
-
- @Override
- public void process(WatchedEvent event) {
- Event.EventType eventType = event.getType();
- // None events are ignored
- // If latchEventType is not null, only fire if the type matches
- if (eventType != Event.EventType.None && (latchEventType == null || eventType == latchEventType)) {
- LOG.info("{} fired on path {} state {}", eventType, event.getPath(), event.getState());
- synchronized (lock) {
- this.event = event;
- lock.notifyAll();
- }
- }
- }
-
- public void await(long timeout) throws InterruptedException {
- synchronized (lock) {
- if (this.event != null) return;
- lock.wait(timeout);
- }
- }
-
- public WatchedEvent getWatchedEvent() {
- return event;
- }
- }
-
- // we avoid creating *many* watches in some cases
- // by saving the childrenWatcher and the children associated - see SOLR-6336
- private LatchWatcher childrenWatcher;
- private TreeMap<Long,String> fetchedChildren;
- private final Object childrenWatcherLock = new Object();
-
- private Map<Long, String> getChildren(long wait) throws InterruptedException, KeeperException
- {
- LatchWatcher watcher;
- TreeMap<Long,String> children;
- synchronized (childrenWatcherLock) {
- watcher = childrenWatcher;
- children = fetchedChildren;
- }
-
- if (watcher == null || watcher.getWatchedEvent() != null) {
- // this watcher is only interested in child change events
- watcher = new LatchWatcher(Watcher.Event.EventType.NodeChildrenChanged);
- while (true) {
- try {
- children = orderedChildren(watcher);
- break;
- } catch (KeeperException.NoNodeException e) {
- zookeeper.create(dir, new byte[0], CreateMode.PERSISTENT, true);
- // go back to the loop and try again
- }
- }
- synchronized (childrenWatcherLock) {
- childrenWatcher = watcher;
- fetchedChildren = children;
- }
- }
-
- while (true) {
- if (!children.isEmpty()) break;
- watcher.await(wait == Long.MAX_VALUE ? DEFAULT_TIMEOUT : wait);
- if (watcher.getWatchedEvent() != null) {
- children = orderedChildren(null);
+ public byte[] remove() throws NoSuchElementException, KeeperException, InterruptedException {
+ TimerContext time = stats.time(dir + "_remove");
+ try {
+ byte[] result = removeFirst();
+ if (result == null) {
+ throw new NoSuchElementException();
}
- if (wait != Long.MAX_VALUE) break;
+ return result;
+ } finally {
+ time.stop();
}
- return Collections.unmodifiableMap(children);
}
/**
* Removes the head of the queue and returns it, blocks until it succeeds.
- *
+ *
* @return The former head of the queue
*/
public byte[] take() throws KeeperException, InterruptedException {
// Same as for element. Should refactor this.
TimerContext timer = stats.time(dir + "_take");
+ updateLock.lockInterruptibly();
try {
- Map<Long, String> orderedChildren = getChildren(Long.MAX_VALUE);
- for (String headNode : orderedChildren.values()) {
- String path = dir + "/" + headNode;
- try {
- byte[] data = zookeeper.getData(path, null, null, true);
- zookeeper.delete(path, -1, true);
- return data;
- } catch (KeeperException.NoNodeException e) {
- // Another client deleted the node first.
+ while (true) {
+ byte[] result = removeFirst();
+ if (result != null) {
+ return result;
}
+ notEmpty.await();
}
- return null; // shouldn't really reach here..
} finally {
+ updateLock.unlock();
timer.stop();
}
}
-
+
/**
- * Inserts data into queue.
- *
- * @return true if data was successfully added
+ * Inserts data into queue. Successfully calling this method does NOT guarantee
+ * that the element will be immediately available in the in-memory queue. In particular,
+ * calling this method on an empty queue will not necessarily cause {@link #poll()} to
+ * return the offered element. Use a blocking method if you must wait for the offered
+ * element to become visible.
*/
- public boolean offer(byte[] data) throws KeeperException,
- InterruptedException {
+ public void offer(byte[] data) throws KeeperException, InterruptedException {
TimerContext time = stats.time(dir + "_offer");
try {
- return createData(dir + "/" + prefix, data,
- CreateMode.PERSISTENT_SEQUENTIAL) != null;
- } finally {
- time.stop();
- }
- }
-
- /**
- * Inserts data into zookeeper.
- *
- * @return true if data was successfully added
- */
- private String createData(String path, byte[] data, CreateMode mode)
- throws KeeperException, InterruptedException {
- for (;;) {
- try {
- return zookeeper.create(path, data, mode, true);
- } catch (KeeperException.NoNodeException e) {
+ while (true) {
try {
- zookeeper.create(dir, new byte[0], CreateMode.PERSISTENT, true);
- } catch (KeeperException.NodeExistsException ne) {
- // someone created it
+ // We don't need to explicitly set isDirty here; if there is a watcher, it will
+ // see the update and set the bit itself; if there is no watcher we can defer
+ // the update anyway.
+ zookeeper.create(dir + "/" + PREFIX, data, CreateMode.PERSISTENT_SEQUENTIAL, true);
+ return;
+ } catch (KeeperException.NoNodeException e) {
+ try {
+ zookeeper.create(dir, new byte[0], CreateMode.PERSISTENT, true);
+ } catch (KeeperException.NodeExistsException ne) {
+ // someone created it
+ }
}
}
+ } finally {
+ time.stop();
}
}
-
+
+ public Overseer.Stats getStats() {
+ return stats;
+ }
+
/**
- * Offer the data and wait for the response
- *
+ * Returns the name if the first known child node, or {@code null} if the queue is empty.
+ * This is the only place {@link #knownChildren} is ever updated!
+ * The caller must double check that the actual node still exists, since the in-memory
+ * list is inherently stale.
*/
- public QueueEvent offer(byte[] data, long timeout) throws KeeperException,
- InterruptedException {
- TimerContext time = stats.time(dir + "_offer");
+ private String firstChild(boolean remove) throws KeeperException, InterruptedException {
+ updateLock.lockInterruptibly();
try {
- String path = createData(dir + "/" + prefix, data,
- CreateMode.PERSISTENT_SEQUENTIAL);
- String watchID = createData(
- dir + "/" + response_prefix + path.substring(path.lastIndexOf("-") + 1),
- null, CreateMode.EPHEMERAL);
-
- Object lock = new Object();
- LatchWatcher watcher = new LatchWatcher(lock);
- synchronized (lock) {
- if (zookeeper.exists(watchID, watcher, true) != null) {
- watcher.await(timeout);
- }
+ // Try to fetch the first in-memory child.
+ if (!knownChildren.isEmpty()) {
+ return remove ? knownChildren.pollFirst() : knownChildren.first();
+ }
+
+ if (lastWatcher != null && !isDirty) {
+ // No children, no known updates, and a watcher is already set; nothing we can do.
+ return null;
}
- byte[] bytes = zookeeper.getData(watchID, null, null, true);
- zookeeper.delete(watchID, -1, true);
- return new QueueEvent(watchID, bytes, watcher.getWatchedEvent());
+
+ // Try to fetch an updated list of children from ZK.
+ ChildWatcher newWatcher = new ChildWatcher();
+ knownChildren = fetchZkChildren(newWatcher);
+ lastWatcher = newWatcher; // only set after fetchZkChildren returns successfully
+ isDirty = false;
+ if (knownChildren.isEmpty()) {
+ return null;
+ }
+ notEmpty.signalAll();
+ return remove ? knownChildren.pollFirst() : knownChildren.first();
} finally {
- time.stop();
+ updateLock.unlock();
}
}
-
+
/**
- * Returns the data at the first element of the queue, or null if the queue is
- * empty.
- *
- * @return data at the first element of the queue, or null.
+ * Return the current set of children from ZK; does not change internal state.
*/
- public byte[] peek() throws KeeperException, InterruptedException {
- TimerContext time = stats.time(dir + "_peek");
- try {
- QueueEvent element = element();
- if (element == null) return null;
- return element.getBytes();
- } finally {
- time.stop();
- }
- }
-
- public List<QueueEvent> peekTopN(int n, Set<String> excludeSet, Long wait)
- throws KeeperException, InterruptedException {
- ArrayList<QueueEvent> topN = new ArrayList<>();
-
- LOG.debug("Peeking for top {} elements. ExcludeSet: " + excludeSet.toString());
- TimerContext time = null;
- if (wait == Long.MAX_VALUE) time = stats.time(dir + "_peekTopN_wait_forever");
- else time = stats.time(dir + "_peekTopN_wait" + wait);
+ TreeSet<String> fetchZkChildren(Watcher watcher) throws InterruptedException, KeeperException {
+ while (true) {
+ try {
+ TreeSet<String> orderedChildren = new TreeSet<>();
- try {
- Map<Long, String> orderedChildren = getChildren(wait);
- for (String headNode : orderedChildren.values()) {
- if (headNode != null && topN.size() < n) {
- try {
- String id = dir + "/" + headNode;
- if (excludeSet != null && excludeSet.contains(id)) continue;
- QueueEvent queueEvent = new QueueEvent(id,
- zookeeper.getData(dir + "/" + headNode, null, null, true), null);
- topN.add(queueEvent);
- } catch (KeeperException.NoNodeException e) {
- // Another client removed the node first, try next
- }
- } else {
- if (topN.size() >= 1) {
- printQueueEventsListElementIds(topN);
- return topN;
+ List<String> childNames = zookeeper.getChildren(dir, watcher, true);
+ stats.setQueueLength(childNames.size());
+ for (String childName : childNames) {
+ // Check format
+ if (!childName.regionMatches(0, PREFIX, 0, PREFIX.length())) {
+ LOG.debug("Found child node with improper name: " + childName);
+ continue;
}
+ orderedChildren.add(childName);
}
+ return orderedChildren;
+ } catch (KeeperException.NoNodeException e) {
+ zookeeper.create(dir, new byte[0], CreateMode.PERSISTENT, true);
+ // go back to the loop and try again
}
-
- if (topN.size() > 0 ) {
- printQueueEventsListElementIds(topN);
- return topN;
- }
- return null;
- } finally {
- time.stop();
}
}
- private void printQueueEventsListElementIds(ArrayList<QueueEvent> topN) {
- if(LOG.isDebugEnabled()) {
- StringBuffer sb = new StringBuffer("[");
- for(QueueEvent queueEvent: topN) {
- sb.append(queueEvent.getId()).append(", ");
+ /**
+ * Return the currently-known set of children from memory. If there are no children,
+ * waits up to {@code waitMillis} for at least one child to become available. May
+ * update the set of known children.
+ */
+ SortedSet<String> getChildren(long waitMillis) throws KeeperException, InterruptedException {
+ long waitNanos = TimeUnit.MILLISECONDS.toNanos(waitMillis);
+ while (waitNanos > 0) {
+ // Trigger a fetch if needed.
+ firstElement();
+ updateLock.lockInterruptibly();
+ try {
+ if (!knownChildren.isEmpty()) {
+ return new TreeSet<>(knownChildren);
+ }
+ waitNanos = notEmpty.awaitNanos(waitNanos);
+ } finally {
+ updateLock.unlock();
}
- sb.append("]");
- LOG.debug("Returning topN elements: {}", sb.toString());
}
+ return ImmutableSortedSet.of();
}
-
/**
+ * Return the head of the queue without modifying the queue.
*
- * Gets last element of the Queue without removing it.
+ * @return the data at the head of the queue.
*/
- public String getTailId() throws KeeperException, InterruptedException {
- TreeMap<Long, String> orderedChildren = null;
- orderedChildren = orderedChildren(null);
- if(orderedChildren == null || orderedChildren.isEmpty()) return null;
-
- for(String headNode : orderedChildren.descendingMap().values())
- if (headNode != null) {
+ private byte[] firstElement() throws KeeperException, InterruptedException {
+ while (true) {
+ String firstChild = firstChild(false);
+ if (firstChild == null) {
+ return null;
+ }
+ try {
+ return zookeeper.getData(dir + "/" + firstChild, null, null, true);
+ } catch (KeeperException.NoNodeException e) {
+ // Another client deleted the node first, remove the in-memory and retry.
+ updateLock.lockInterruptibly();
try {
- QueueEvent queueEvent = new QueueEvent(dir + "/" + headNode, zookeeper.getData(dir + "/" + headNode,
- null, null, true), null);
- return queueEvent.getId();
- } catch (KeeperException.NoNodeException e) {
- // Another client removed the node first, try next
+ knownChildren.remove(firstChild);
+ } finally {
+ updateLock.unlock();
}
}
- return null;
- }
-
- public static class QueueEvent {
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + ((id == null) ? 0 : id.hashCode());
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) return true;
- if (obj == null) return false;
- if (getClass() != obj.getClass()) return false;
- QueueEvent other = (QueueEvent) obj;
- if (id == null) {
- if (other.id != null) return false;
- } else if (!id.equals(other.id)) return false;
- return true;
- }
-
- private WatchedEvent event = null;
- private String id;
- private byte[] bytes;
-
- QueueEvent(String id, byte[] bytes, WatchedEvent event) {
- this.id = id;
- this.bytes = bytes;
- this.event = event;
- }
-
- public void setId(String id) {
- this.id = id;
- }
-
- public String getId() {
- return id;
- }
-
- public void setBytes(byte[] bytes) {
- this.bytes = bytes;
- }
-
- public byte[] getBytes() {
- return bytes;
- }
-
- public WatchedEvent getWatchedEvent() {
- return event;
}
-
- }
-
- /**
- * Returns the data at the first element of the queue, or null if the queue is
- * empty and block is false.
- *
- * @param block if true, blocks until an element enters the queue
- * @return data at the first element of the queue, or null.
- */
- public QueueEvent peek(boolean block) throws KeeperException, InterruptedException {
- return peek(block ? Long.MAX_VALUE : 0);
}
- /**
- * Returns the data at the first element of the queue, or null if the queue is
- * empty after wait ms.
- *
- * @param wait max wait time in ms.
- * @return data at the first element of the queue, or null.
- */
- public QueueEvent peek(long wait) throws KeeperException, InterruptedException {
- TimerContext time = null;
- if (wait == Long.MAX_VALUE) {
- time = stats.time(dir + "_peek_wait_forever");
- } else {
- time = stats.time(dir + "_peek_wait" + wait);
- }
- try {
- if (wait == 0) {
- return element();
+ private byte[] removeFirst() throws KeeperException, InterruptedException {
+ while (true) {
+ String firstChild = firstChild(true);
+ if (firstChild == null) {
+ return null;
}
-
- Map<Long, String> orderedChildren = getChildren(wait);
- for (String headNode : orderedChildren.values()) {
- String path = dir + "/" + headNode;
+ try {
+ String path = dir + "/" + firstChild;
+ byte[] result = zookeeper.getData(path, null, null, true);
+ zookeeper.delete(path, -1, true);
+ return result;
+ } catch (KeeperException.NoNodeException e) {
+ // Another client deleted the node first, remove the in-memory and retry.
+ updateLock.lockInterruptibly();
try {
- byte[] data = zookeeper.getData(path, null, null, true);
- return new QueueEvent(path, data, null);
- } catch (KeeperException.NoNodeException e) {
- // Another client deleted the node first.
+ knownChildren.remove(firstChild);
+ } finally {
+ updateLock.unlock();
}
}
- return null;
- } finally {
- time.stop();
}
}
-
- /**
- * Attempts to remove the head of the queue and return it. Returns null if the
- * queue is empty.
- *
- * @return Head of the queue or null.
- */
- public byte[] poll() throws KeeperException, InterruptedException {
- TimerContext time = stats.time(dir + "_poll");
+
+ @VisibleForTesting boolean hasWatcher() throws InterruptedException {
+ updateLock.lockInterruptibly();
try {
- return remove();
- } catch (NoSuchElementException e) {
- return null;
+ return lastWatcher != null;
} finally {
- time.stop();
+ updateLock.unlock();
}
}
- public Overseer.Stats getStats() {
- return stats;
+ private class ChildWatcher implements Watcher {
+
+ @Override
+ public void process(WatchedEvent event) {
+ updateLock.lock();
+ try {
+ // this watcher is automatically cleared when fired
+ if (lastWatcher == this) {
+ lastWatcher = null;
+ }
+ // Do no updates in this thread, just signal state back to client threads.
+ isDirty = true;
+ // optimistically signal any waiters that the queue may not be empty now, so they can wake up and retry
+ notEmpty.signalAll();
+ } finally {
+ updateLock.unlock();
+ }
+ }
}
}
Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/Overseer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/Overseer.java?rev=1696789&r1=1696788&r2=1696789&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/Overseer.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/Overseer.java Thu Aug 20 14:05:36 2015
@@ -188,7 +188,7 @@ public class Overseer implements Closeab
}
}
- DistributedQueue.QueueEvent head = null;
+ byte[] head = null;
try {
head = stateUpdateQueue.peek(true);
} catch (KeeperException e) {
@@ -207,8 +207,8 @@ public class Overseer implements Closeab
}
try {
while (head != null) {
- final byte[] data = head.getBytes();
- final ZkNodeProps message = ZkNodeProps.load(head.getBytes());
+ final byte[] data = head;
+ final ZkNodeProps message = ZkNodeProps.load(data);
log.info("processMessage: queueSize: {}, message = {} current state version: {}", stateUpdateQueue.getStats().getQueueLength(), message, clusterState.getZkClusterStateVersion());
// we can batch here because workQueue is our fallback in case a ZK write failed
clusterState = processQueueItem(message, clusterState, zkStateWriter, true, new ZkStateWriter.ZkWriteCallback() {
@@ -922,13 +922,13 @@ public class Overseer implements Closeab
}
/* Collection creation queue */
- static DistributedQueue getCollectionQueue(final SolrZkClient zkClient) {
+ static OverseerCollectionQueue getCollectionQueue(final SolrZkClient zkClient) {
return getCollectionQueue(zkClient, new Stats());
}
- static DistributedQueue getCollectionQueue(final SolrZkClient zkClient, Stats zkStats) {
+ static OverseerCollectionQueue getCollectionQueue(final SolrZkClient zkClient, Stats zkStats) {
createOverseerNode(zkClient);
- return new DistributedQueue(zkClient, "/overseer/collection-queue-work", zkStats);
+ return new OverseerCollectionQueue(zkClient, "/overseer/collection-queue-work", zkStats);
}
private static void createOverseerNode(final SolrZkClient zkClient) {
Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java?rev=1696789&r1=1696788&r2=1696789&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java Thu Aug 20 14:05:36 2015
@@ -53,7 +53,7 @@ public class OverseerCollectionProcessor
Overseer.Stats stats,
Overseer overseer,
OverseerNodePrioritizer overseerNodePrioritizer,
- DistributedQueue workQueue,
+ OverseerCollectionQueue workQueue,
DistributedMap runningMap,
DistributedMap completedMap,
DistributedMap failureMap) {
Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/OverseerProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/OverseerProcessor.java?rev=1696789&r1=1696788&r2=1696789&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/OverseerProcessor.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/OverseerProcessor.java Thu Aug 20 14:05:36 2015
@@ -29,9 +29,8 @@ import java.util.concurrent.SynchronousQ
import java.util.concurrent.TimeUnit;
import org.apache.solr.client.solrj.SolrResponse;
-import org.apache.solr.cloud.DistributedQueue.QueueEvent;
+import org.apache.solr.cloud.OverseerCollectionQueue.QueueEvent;
import org.apache.solr.cloud.Overseer.LeaderStatus;
-import org.apache.solr.cloud.overseer.OverseerAction;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkNodeProps;
@@ -66,7 +65,7 @@ public class OverseerProcessor implement
private static Logger log = LoggerFactory
.getLogger(OverseerProcessor.class);
- private DistributedQueue workQueue;
+ private OverseerCollectionQueue workQueue;
private DistributedMap runningMap;
private DistributedMap completedMap;
private DistributedMap failureMap;
@@ -105,7 +104,7 @@ public class OverseerProcessor implement
Overseer.Stats stats,
OverseerMessageHandlerSelector selector,
OverseerNodePrioritizer prioritizer,
- DistributedQueue workQueue,
+ OverseerCollectionQueue workQueue,
DistributedMap runningMap,
DistributedMap completedMap,
DistributedMap failureMap) {
Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/ZkController.java?rev=1696789&r1=1696788&r2=1696789&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/ZkController.java Thu Aug 20 14:05:36 2015
@@ -122,7 +122,7 @@ public final class ZkController {
private final boolean SKIP_AUTO_RECOVERY = Boolean.getBoolean("solrcloud.skip.autorecovery");
private final DistributedQueue overseerJobQueue;
- private final DistributedQueue overseerCollectionQueue;
+ private final OverseerCollectionQueue overseerCollectionQueue;
private final DistributedMap overseerRunningMap;
private final DistributedMap overseerCompletedMap;
@@ -1776,7 +1776,7 @@ public final class ZkController {
return overseerJobQueue;
}
- public DistributedQueue getOverseerCollectionQueue() {
+ public OverseerCollectionQueue getOverseerCollectionQueue() {
return overseerCollectionQueue;
}
Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java?rev=1696789&r1=1696788&r2=1696789&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java Thu Aug 20 14:05:36 2015
@@ -36,8 +36,8 @@ import org.apache.solr.client.solrj.impl
import org.apache.solr.client.solrj.request.CoreAdminRequest;
import org.apache.solr.client.solrj.request.CoreAdminRequest.RequestSyncShard;
import org.apache.solr.cloud.DistributedMap;
-import org.apache.solr.cloud.DistributedQueue;
-import org.apache.solr.cloud.DistributedQueue.QueueEvent;
+import org.apache.solr.cloud.OverseerCollectionQueue;
+import org.apache.solr.cloud.OverseerCollectionQueue.QueueEvent;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.OverseerSolrResponse;
import org.apache.solr.cloud.overseer.SliceMutator;
@@ -252,7 +252,7 @@ public class CollectionsHandler extends
}
private boolean overseerCollectionQueueContains(String asyncId) throws KeeperException, InterruptedException {
- DistributedQueue collectionQueue = coreContainer.getZkController().getOverseerCollectionQueue();
+ OverseerCollectionQueue collectionQueue = coreContainer.getZkController().getOverseerCollectionQueue();
return collectionQueue.containsTaskWithRequestId(ASYNC, asyncId);
}
Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java?rev=1696789&r1=1696788&r2=1696789&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java Thu Aug 20 14:05:36 2015
@@ -21,8 +21,8 @@ import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.CharsRefBuilder;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.cloud.CloudDescriptor;
-import org.apache.solr.cloud.DistributedQueue;
import org.apache.solr.cloud.LeaderInitiatedRecoveryThread;
+import org.apache.solr.cloud.DistributedQueue;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.cloud.overseer.OverseerAction;
Modified: lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java?rev=1696789&r1=1696788&r2=1696789&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java Thu Aug 20 14:05:36 2015
@@ -16,13 +16,19 @@ package org.apache.solr.cloud;
* the License.
*/
-import java.io.File;
import java.nio.charset.Charset;
+import java.util.NoSuchElementException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.common.cloud.SolrZkClient;
-import org.apache.solr.cloud.DistributedQueue.QueueEvent;
-
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.SolrjNamedThreadFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -33,6 +39,7 @@ public class DistributedQueueTest extend
protected ZkTestServer zkServer;
protected SolrZkClient zkClient;
+ protected ExecutorService executor = ExecutorUtil.newMDCAwareSingleThreadExecutor(new SolrjNamedThreadFactory("dqtest-"));
@Before
@Override
@@ -44,37 +51,120 @@ public class DistributedQueueTest extend
@Test
public void testDistributedQueue() throws Exception {
String dqZNode = "/distqueue/test";
- String testData = "hello world";
- long timeoutMs = 500L;
+ byte[] data = "hello world".getBytes(UTF8);
- DistributedQueue dq = new DistributedQueue(zkClient, setupDistributedQueueZNode(dqZNode));
+ DistributedQueue dq = makeDistributedQueue(dqZNode);
// basic ops
- assertTrue(dq.poll() == null);
- byte[] data = testData.getBytes(UTF8);
+ assertNull(dq.poll());
+ try {
+ dq.remove();
+ fail("NoSuchElementException expected");
+ } catch (NoSuchElementException expected) {
+ // expected
+ }
+
+ dq.offer(data);
+ assertArrayEquals(dq.peek(500), data);
+ assertArrayEquals(dq.remove(), data);
+ assertNull(dq.poll());
+
dq.offer(data);
- assertEquals(new String(dq.peek(),UTF8), testData);
- assertEquals(new String(dq.take(),UTF8), testData);
- assertTrue(dq.poll() == null);
- QueueEvent qe = dq.offer(data, timeoutMs);
- assertNotNull(qe);
- assertEquals(new String(dq.remove(),UTF8), testData);
+ assertArrayEquals(dq.take(), data); // waits for data
+ assertNull(dq.poll());
+
+ dq.offer(data);
+ dq.peek(true); // wait until data is definitely there before calling remove
+ assertArrayEquals(dq.remove(), data);
+ assertNull(dq.poll());
// should block until the background thread makes the offer
(new QueueChangerThread(dq, 1000)).start();
- qe = dq.peek(true);
- assertNotNull(qe);
- dq.remove();
+ assertNotNull(dq.peek(true));
+ assertNotNull(dq.remove());
+ assertNull(dq.poll());
// timeout scenario ... background thread won't offer until long after the peek times out
QueueChangerThread qct = new QueueChangerThread(dq, 1000);
qct.start();
- qe = dq.peek(500);
- assertTrue(qe == null);
+ assertNull(dq.peek(500));
+ qct.join();
+ }
+
+ @Test
+ public void testDistributedQueueBlocking() throws Exception {
+ String dqZNode = "/distqueue/test";
+ String testData = "hello world";
+
+ final DistributedQueue dq = makeDistributedQueue(dqZNode);
+ assertNull(dq.peek());
+ Future<String> future = executor.submit(new Callable<String>() {
+ @Override
+ public String call() throws Exception {
+ return new String(dq.peek(true), UTF8);
+ }
+ });
try {
- qct.interrupt();
- } catch (Exception exc) {}
+ future.get(1000, TimeUnit.MILLISECONDS);
+ fail("TimeoutException expected");
+ } catch (TimeoutException expected) {
+ assertFalse(future.isDone());
+ }
+
+ // Ultimately trips the watcher, triggering child refresh
+ dq.offer(testData.getBytes(UTF8));
+ assertEquals(testData, future.get(1000, TimeUnit.MILLISECONDS));
+ assertNotNull(dq.poll());
+
+ // After draining the queue, a watcher should be set.
+ assertNull(dq.peek(100));
+ assertTrue(dq.hasWatcher());
+
+ forceSessionExpire();
+
+ // Session expiry should have fired the watcher.
+ Thread.sleep(100);
+ assertFalse(dq.hasWatcher());
+
+ // Rerun the earlier test make sure updates are still seen, post reconnection.
+ future = executor.submit(new Callable<String>() {
+ @Override
+ public String call() throws Exception {
+ return new String(dq.peek(true), UTF8);
+ }
+ });
+ try {
+ future.get(1000, TimeUnit.MILLISECONDS);
+ fail("TimeoutException expected");
+ } catch (TimeoutException expected) {
+ assertFalse(future.isDone());
+ }
+
+ // Ultimately trips the watcher, triggering child refresh
+ dq.offer(testData.getBytes(UTF8));
+ assertEquals(testData, future.get(1000, TimeUnit.MILLISECONDS));
+ assertNotNull(dq.poll());
+ assertNull(dq.poll());
+ }
+
+ private void forceSessionExpire() throws InterruptedException, TimeoutException {
+ long sessionId = zkClient.getSolrZooKeeper().getSessionId();
+ zkServer.expire(sessionId);
+ zkClient.getConnectionManager().waitForDisconnected(10000);
+ zkClient.getConnectionManager().waitForConnected(10000);
+ for (int i = 0; i < 100; ++i) {
+ if (zkClient.isConnected()) {
+ break;
+ }
+ Thread.sleep(50);
+ }
+ assertTrue(zkClient.isConnected());
+ assertFalse(sessionId == zkClient.getSolrZooKeeper().getSessionId());
+ }
+
+ protected DistributedQueue makeDistributedQueue(String dqZNode) throws Exception {
+ return new DistributedQueue(zkClient, setupNewDistributedQueueZNode(dqZNode));
}
private class QueueChangerThread extends Thread {
@@ -99,7 +189,7 @@ public class DistributedQueueTest extend
}
}
- protected String setupDistributedQueueZNode(String znodePath) throws Exception {
+ protected String setupNewDistributedQueueZNode(String znodePath) throws Exception {
if (!zkClient.exists("/", true))
zkClient.makePath("/", false, true);
if (zkClient.exists(znodePath, true))
@@ -113,8 +203,10 @@ public class DistributedQueueTest extend
public void tearDown() throws Exception {
try {
super.tearDown();
- } catch (Exception exc) {}
+ } catch (Exception exc) {
+ }
closeZk();
+ executor.shutdown();
}
protected void setupZk() throws Exception {
Modified: lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionProcessorTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionProcessorTest.java?rev=1696789&r1=1696788&r2=1696789&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionProcessorTest.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionProcessorTest.java Thu Aug 20 14:05:36 2015
@@ -20,7 +20,7 @@ package org.apache.solr.cloud;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.client.solrj.response.QueryResponse;
-import org.apache.solr.cloud.DistributedQueue.QueueEvent;
+import org.apache.solr.cloud.OverseerCollectionQueue.QueueEvent;
import org.apache.solr.cloud.Overseer.LeaderStatus;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.SolrZkClient;
@@ -79,7 +79,7 @@ public class OverseerCollectionProcessor
private static final String COLLECTION_NAME = "mycollection";
private static final String CONFIG_NAME = "myconfig";
- private static DistributedQueue workQueueMock;
+ private static OverseerCollectionQueue workQueueMock;
private static DistributedMap runningMapMock;
private static DistributedMap completedMapMock;
private static DistributedMap failureMapMock;
@@ -105,7 +105,7 @@ public class OverseerCollectionProcessor
public OverseerCollectionProcessorToBeTested(ZkStateReader zkStateReader,
String myId, ShardHandlerFactory shardHandlerFactory,
String adminPath,
- DistributedQueue workQueue, DistributedMap runningMap,
+ OverseerCollectionQueue workQueue, DistributedMap runningMap,
DistributedMap completedMap,
DistributedMap failureMap) {
super(zkStateReader, myId, shardHandlerFactory, adminPath, new Overseer.Stats(), null, new OverseerNodePrioritizer(zkStateReader, adminPath, shardHandlerFactory), workQueue, runningMap, completedMap, failureMap);
@@ -120,7 +120,7 @@ public class OverseerCollectionProcessor
@BeforeClass
public static void setUpOnce() throws Exception {
- workQueueMock = createMock(DistributedQueue.class);
+ workQueueMock = createMock(OverseerCollectionQueue.class);
runningMapMock = createMock(DistributedMap.class);
completedMapMock = createMock(DistributedMap.class);
failureMapMock = createMock(DistributedMap.class);