You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by sh...@apache.org on 2015/08/20 16:05:37 UTC

svn commit: r1696789 - in /lucene/dev/branches/branch_5x: ./ solr/ solr/core/ solr/core/src/java/org/apache/solr/cloud/ solr/core/src/java/org/apache/solr/handler/admin/ solr/core/src/java/org/apache/solr/update/processor/ solr/core/src/test/org/apache...

Author: shalin
Date: Thu Aug 20 14:05:36 2015
New Revision: 1696789

URL: http://svn.apache.org/r1696789
Log:
SOLR-6760: New optimized DistributedQueue implementation for overseer

Added:
    lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionQueue.java
      - copied unchanged from r1696706, lucene/dev/trunk/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionQueue.java
    lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionQueueTest.java
      - copied unchanged from r1696706, lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionQueueTest.java
Modified:
    lucene/dev/branches/branch_5x/   (props changed)
    lucene/dev/branches/branch_5x/solr/   (props changed)
    lucene/dev/branches/branch_5x/solr/CHANGES.txt   (contents, props changed)
    lucene/dev/branches/branch_5x/solr/core/   (props changed)
    lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java
    lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/Overseer.java
    lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java
    lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/OverseerProcessor.java
    lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/ZkController.java
    lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
    lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
    lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java
    lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionProcessorTest.java

Modified: lucene/dev/branches/branch_5x/solr/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/CHANGES.txt?rev=1696789&r1=1696788&r2=1696789&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/CHANGES.txt (original)
+++ lucene/dev/branches/branch_5x/solr/CHANGES.txt Thu Aug 20 14:05:36 2015
@@ -74,6 +74,10 @@ Optimizations
   is anywhere from 20% to over 100% faster and produces less garbage on average.
   (yonik)
 
+* SOLR-6760: New optimized DistributedQueue implementation for overseer increases
+  message processing performance by ~470%.
+  (Noble Paul, Scott Blum, shalin)
+
 Other Changes
 ----------------------
 

Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java?rev=1696789&r1=1696788&r2=1696789&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/DistributedQueue.java Thu Aug 20 14:05:36 2015
@@ -18,19 +18,22 @@
 
 package org.apache.solr.cloud;
 
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 import java.util.NoSuchElementException;
-import java.util.Set;
-import java.util.TreeMap;
-
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSortedSet;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkCmdExecutor;
-import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.util.stats.TimerContext;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -40,23 +43,60 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * A distributed queue from zk recipes.
+ * A distributed queue.
  */
 public class DistributedQueue {
   private static final Logger LOG = LoggerFactory.getLogger(DistributedQueue.class);
-  
-  private static long DEFAULT_TIMEOUT = 5*60*1000;
-  
-  private final String dir;
-  
-  private SolrZkClient zookeeper;
-  
-  private final String prefix = "qn-";
-  
-  private final String response_prefix = "qnr-" ;
 
-  private final Overseer.Stats stats;
-  
+  static final String PREFIX = "qn-";
+
+  /**
+   * Theory of operation:
+   * <p>
+   * Under ordinary circumstances we neither watch nor poll for children in ZK.
+   * Instead we keep an in-memory list of known child names.  When the in-memory
+   * list is exhausted, we then fetch from ZK.
+   * <p>
+   * We only bother setting a child watcher when the queue has no children in ZK.
+   */
+  private static final Object _IMPLEMENTATION_NOTES = null;
+
+  final String dir;
+
+  final SolrZkClient zookeeper;
+
+  final Overseer.Stats stats;
+
+  /**
+   * A lock that guards all of the mutable state that follows.
+   */
+  private final ReentrantLock updateLock = new ReentrantLock();
+
+  /**
+   * Contains the last set of children fetched from ZK. Elements are removed from the head of
+   * this in-memory set as they are consumed from the queue.  Due to the distributed nature
+   * of the queue, elements may appear in this set whose underlying nodes have been consumed in ZK.
+   * Therefore, methods like {@link #peek()} have to double-check actual node existence, and methods
+   * like {@link #poll()} must resolve any races by attempting to delete the underlying node.
+   */
+  private TreeSet<String> knownChildren = new TreeSet<>();
+
+  /**
+   * Used to wait on a non-empty queue; you must hold {@link #updateLock} and verify that
+   * {@link #knownChildren} is empty before waiting on this condition.
+   */
+  private final Condition notEmpty = updateLock.newCondition();
+
+  /**
+   * If non-null, the last watcher to listen for child changes.
+   */
+  private ChildWatcher lastWatcher = null;
+
+  /**
+   * If true, ZK's child list probably doesn't match what's in memory.
+   */
+  private boolean isDirty = true;
+
   public DistributedQueue(SolrZkClient zookeeper, String dir) {
     this(zookeeper, dir, new Overseer.Stats());
   }
@@ -77,554 +117,312 @@ public class DistributedQueue {
     this.zookeeper = zookeeper;
     this.stats = stats;
   }
-  
-  /**
-   * Returns a Map of the children, ordered by id.
-   * 
-   * @param watcher
-   *          optional watcher on getChildren() operation.
-   * @return map from id to child name for all children
-   */
-  private TreeMap<Long,String> orderedChildren(Watcher watcher)
-      throws KeeperException, InterruptedException {
-    TreeMap<Long,String> orderedChildren = new TreeMap<>();
-
-    List<String> childNames = zookeeper.getChildren(dir, watcher, true);
-    stats.setQueueLength(childNames.size());
-    for (String childName : childNames) {
-      try {
-        // Check format
-        if (!childName.regionMatches(0, prefix, 0, prefix.length())) {
-          LOG.debug("Found child node with improper name: " + childName);
-          continue;
-        }
-        String suffix = childName.substring(prefix.length());
-        Long childId = new Long(suffix);
-        orderedChildren.put(childId, childName);
-      } catch (NumberFormatException e) {
-        LOG.warn("Found child node with improper format : " + childName + " "
-            + e, e);
-      }
-    }
-    
-    return orderedChildren;
-  }
-
 
   /**
-   * Returns true if the queue contains a task with the specified async id.
+   * Returns the data at the first element of the queue, or null if the queue is
+   * empty.
+   *
+   * @return data at the first element of the queue, or null.
    */
-  public boolean containsTaskWithRequestId(String requestIdKey, String requestId)
-      throws KeeperException, InterruptedException {
-
-    List<String> childNames = zookeeper.getChildren(dir, null, true);
-    stats.setQueueLength(childNames.size());
-    for (String childName : childNames) {
-      if (childName != null) {
-        try {
-          byte[] data = zookeeper.getData(dir + "/" + childName, null, null, true);
-          if (data != null) {
-            ZkNodeProps message = ZkNodeProps.load(data);
-            if (message.containsKey(requestIdKey)) {
-              LOG.debug(">>>> {}", message.get(requestIdKey));
-              if(message.get(requestIdKey).equals(requestId)) return true;
-            }
-          }
-        } catch (KeeperException.NoNodeException e) {
-          // Another client removed the node first, try next
-        }
-      }
+  public byte[] peek() throws KeeperException, InterruptedException {
+    TimerContext time = stats.time(dir + "_peek");
+    try {
+      return firstElement();
+    } finally {
+      time.stop();
     }
-
-    return false;
   }
 
-
   /**
-   * Return the head of the queue without modifying the queue.
-   * 
-   * @return the data at the head of the queue.
+   * Returns the data at the first element of the queue, or null if the queue is
+   * empty and block is false.
+   *
+   * @param block if true, blocks until an element enters the queue
+   * @return data at the first element of the queue, or null.
    */
-  private QueueEvent element() throws KeeperException,
-      InterruptedException {
-    TreeMap<Long,String> orderedChildren;
-    
-    // element, take, and remove follow the same pattern.
-    // We want to return the child node with the smallest sequence number.
-    // Since other clients are remove()ing and take()ing nodes concurrently,
-    // the child with the smallest sequence number in orderedChildren might be
-    // gone by the time we check.
-    // We don't call getChildren again until we have tried the rest of the nodes
-    // in sequence order.
-    while (true) {
-      try {
-        orderedChildren = orderedChildren(null);
-      } catch (KeeperException.NoNodeException e) {
-        return null;
-      }
-      if (orderedChildren.size() == 0) return null;
-      
-      for (String headNode : orderedChildren.values()) {
-        if (headNode != null) {
-          try {
-            return new QueueEvent(dir + "/" + headNode, zookeeper.getData(dir + "/" + headNode, null, null, true), null);
-          } catch (KeeperException.NoNodeException e) {
-            // Another client removed the node first, try next
-          }
-        }
-      }
-    }
+  public byte[] peek(boolean block) throws KeeperException, InterruptedException {
+    return block ? peek(Long.MAX_VALUE) : peek();
   }
-  
+
   /**
-   * Attempts to remove the head of the queue and return it.
-   * 
-   * @return The former head of the queue
+   * Returns the data at the first element of the queue, or null if the queue is
+   * empty after wait ms.
+   *
+   * @param wait max wait time in ms.
+   * @return data at the first element of the queue, or null.
    */
-  public byte[] remove() throws NoSuchElementException, KeeperException,
-      InterruptedException {
-    TreeMap<Long,String> orderedChildren;
-    // Same as for element. Should refactor this.
-    TimerContext time = stats.time(dir + "_remove");
+  public byte[] peek(long wait) throws KeeperException, InterruptedException {
+    Preconditions.checkArgument(wait > 0);
+    TimerContext time;
+    if (wait == Long.MAX_VALUE) {
+      time = stats.time(dir + "_peek_wait_forever");
+    } else {
+      time = stats.time(dir + "_peek_wait" + wait);
+    }
+    updateLock.lockInterruptibly();
     try {
-      while (true) {
-        try {
-          orderedChildren = orderedChildren(null);
-        } catch (KeeperException.NoNodeException e) {
-          throw new NoSuchElementException();
-        }
-        if (orderedChildren.size() == 0) throw new NoSuchElementException();
-
-        for (String headNode : orderedChildren.values()) {
-          String path = dir + "/" + headNode;
-          try {
-            byte[] data = zookeeper.getData(path, null, null, true);
-            zookeeper.delete(path, -1, true);
-            return data;
-          } catch (KeeperException.NoNodeException e) {
-            // Another client deleted the node first.
-          }
+      long waitNanos = TimeUnit.MILLISECONDS.toNanos(wait);
+      while (waitNanos > 0) {
+        byte[] result = firstElement();
+        if (result != null) {
+          return result;
         }
-
+        waitNanos = notEmpty.awaitNanos(waitNanos);
       }
+      return null;
     } finally {
+      updateLock.unlock();
       time.stop();
     }
   }
-  
+
   /**
-   * Remove the event and save the response into the other path.
-   * 
+   * Attempts to remove the head of the queue and return it. Returns null if the
+   * queue is empty.
+   *
+   * @return Head of the queue or null.
    */
-  public byte[] remove(QueueEvent event) throws KeeperException,
-      InterruptedException {
-    TimerContext time = stats.time(dir + "_remove_event");
+  public byte[] poll() throws KeeperException, InterruptedException {
+    TimerContext time = stats.time(dir + "_poll");
     try {
-      String path = event.getId();
-      String responsePath = dir + "/" + response_prefix
-          + path.substring(path.lastIndexOf("-") + 1);
-      if (zookeeper.exists(responsePath, true)) {
-        zookeeper.setData(responsePath, event.getBytes(), true);
-      }
-      byte[] data = zookeeper.getData(path, null, null, true);
-      zookeeper.delete(path, -1, true);
-      return data;
+      return removeFirst();
     } finally {
       time.stop();
     }
   }
 
   /**
-   * Watcher that blocks until a WatchedEvent occurs for a znode.
+   * Attempts to remove the head of the queue and return it.
+   *
+   * @return The former head of the queue
    */
-  private final class LatchWatcher implements Watcher {
-
-    private final Object lock;
-    private WatchedEvent event;
-    private Event.EventType latchEventType;
-
-    LatchWatcher(Object lock) {
-      this(lock, null);
-    }
-
-    LatchWatcher(Event.EventType eventType) {
-      this(new Object(), eventType);
-    }
-
-    LatchWatcher(Object lock, Event.EventType eventType) {
-      this.lock = lock;
-      this.latchEventType = eventType;
-    }
-
-    @Override
-    public void process(WatchedEvent event) {
-      Event.EventType eventType = event.getType();
-      // None events are ignored
-      // If latchEventType is not null, only fire if the type matches
-      if (eventType != Event.EventType.None && (latchEventType == null || eventType == latchEventType)) {
-        LOG.info("{} fired on path {} state {}", eventType, event.getPath(), event.getState());
-        synchronized (lock) {
-          this.event = event;
-          lock.notifyAll();
-        }
-      }
-    }
-
-    public void await(long timeout) throws InterruptedException {
-      synchronized (lock) {
-        if (this.event != null) return;
-        lock.wait(timeout);
-      }
-    }
-
-    public WatchedEvent getWatchedEvent() {
-      return event;
-    }
-  }
-
-  // we avoid creating *many* watches in some cases
-  // by saving the childrenWatcher and the children associated - see SOLR-6336
-  private LatchWatcher childrenWatcher;
-  private TreeMap<Long,String> fetchedChildren;
-  private final Object childrenWatcherLock = new Object();
-
-  private Map<Long, String> getChildren(long wait) throws InterruptedException, KeeperException
-  {
-    LatchWatcher watcher;
-    TreeMap<Long,String> children;
-    synchronized (childrenWatcherLock) {
-      watcher = childrenWatcher;
-      children = fetchedChildren;
-    }
-
-    if (watcher == null ||  watcher.getWatchedEvent() != null) {
-      // this watcher is only interested in child change events
-      watcher = new LatchWatcher(Watcher.Event.EventType.NodeChildrenChanged);
-      while (true) {
-        try {
-          children = orderedChildren(watcher);
-          break;
-        } catch (KeeperException.NoNodeException e) {
-          zookeeper.create(dir, new byte[0], CreateMode.PERSISTENT, true);
-          // go back to the loop and try again
-        }
-      }
-      synchronized (childrenWatcherLock) {
-        childrenWatcher = watcher;
-        fetchedChildren = children;
-      }
-    }
-
-    while (true) {
-      if (!children.isEmpty()) break;
-      watcher.await(wait == Long.MAX_VALUE ? DEFAULT_TIMEOUT : wait);
-      if (watcher.getWatchedEvent() != null) {
-        children = orderedChildren(null);
+  public byte[] remove() throws NoSuchElementException, KeeperException, InterruptedException {
+    TimerContext time = stats.time(dir + "_remove");
+    try {
+      byte[] result = removeFirst();
+      if (result == null) {
+        throw new NoSuchElementException();
       }
-      if (wait != Long.MAX_VALUE) break;
+      return result;
+    } finally {
+      time.stop();
     }
-    return Collections.unmodifiableMap(children);
   }
 
   /**
    * Removes the head of the queue and returns it, blocks until it succeeds.
-   * 
+   *
    * @return The former head of the queue
    */
   public byte[] take() throws KeeperException, InterruptedException {
     // Same as for element. Should refactor this.
     TimerContext timer = stats.time(dir + "_take");
+    updateLock.lockInterruptibly();
     try {
-      Map<Long, String> orderedChildren = getChildren(Long.MAX_VALUE);
-      for (String headNode : orderedChildren.values()) {
-        String path = dir + "/" + headNode;
-        try {
-          byte[] data = zookeeper.getData(path, null, null, true);
-          zookeeper.delete(path, -1, true);
-          return data;
-        } catch (KeeperException.NoNodeException e) {
-          // Another client deleted the node first.
+      while (true) {
+        byte[] result = removeFirst();
+        if (result != null) {
+          return result;
         }
+        notEmpty.await();
       }
-      return null; // shouldn't really reach here..
     } finally {
+      updateLock.unlock();
       timer.stop();
     }
   }
-  
+
   /**
-   * Inserts data into queue.
-   * 
-   * @return true if data was successfully added
+   * Inserts data into queue.  Successfully calling this method does NOT guarantee
+   * that the element will be immediately available in the in-memory queue. In particular,
+   * calling this method on an empty queue will not necessarily cause {@link #poll()} to
+   * return the offered element.  Use a blocking method if you must wait for the offered
+   * element to become visible.
    */
-  public boolean offer(byte[] data) throws KeeperException,
-      InterruptedException {
+  public void offer(byte[] data) throws KeeperException, InterruptedException {
     TimerContext time = stats.time(dir + "_offer");
     try {
-      return createData(dir + "/" + prefix, data,
-          CreateMode.PERSISTENT_SEQUENTIAL) != null;
-    } finally {
-      time.stop();
-    }
-  }
-  
-  /**
-   * Inserts data into zookeeper.
-   * 
-   * @return true if data was successfully added
-   */
-  private String createData(String path, byte[] data, CreateMode mode)
-      throws KeeperException, InterruptedException {
-    for (;;) {
-      try {
-        return zookeeper.create(path, data, mode, true);
-      } catch (KeeperException.NoNodeException e) {
+      while (true) {
         try {
-          zookeeper.create(dir, new byte[0], CreateMode.PERSISTENT, true);
-        } catch (KeeperException.NodeExistsException ne) {
-          // someone created it
+          // We don't need to explicitly set isDirty here; if there is a watcher, it will
+          // see the update and set the bit itself; if there is no watcher we can defer
+          // the update anyway.
+          zookeeper.create(dir + "/" + PREFIX, data, CreateMode.PERSISTENT_SEQUENTIAL, true);
+          return;
+        } catch (KeeperException.NoNodeException e) {
+          try {
+            zookeeper.create(dir, new byte[0], CreateMode.PERSISTENT, true);
+          } catch (KeeperException.NodeExistsException ne) {
+            // someone created it
+          }
         }
       }
+    } finally {
+      time.stop();
     }
   }
-  
+
+  public Overseer.Stats getStats() {
+    return stats;
+  }
+
   /**
-   * Offer the data and wait for the response
-   * 
+   * Returns the name if the first known child node, or {@code null} if the queue is empty.
+   * This is the only place {@link #knownChildren} is ever updated!
+   * The caller must double check that the actual node still exists, since the in-memory
+   * list is inherently stale.
    */
-  public QueueEvent offer(byte[] data, long timeout) throws KeeperException,
-      InterruptedException {
-    TimerContext time = stats.time(dir + "_offer");
+  private String firstChild(boolean remove) throws KeeperException, InterruptedException {
+    updateLock.lockInterruptibly();
     try {
-      String path = createData(dir + "/" + prefix, data,
-          CreateMode.PERSISTENT_SEQUENTIAL);
-      String watchID = createData(
-          dir + "/" + response_prefix + path.substring(path.lastIndexOf("-") + 1),
-          null, CreateMode.EPHEMERAL);
-
-      Object lock = new Object();
-      LatchWatcher watcher = new LatchWatcher(lock);
-      synchronized (lock) {
-        if (zookeeper.exists(watchID, watcher, true) != null) {
-          watcher.await(timeout);
-        }
+      // Try to fetch the first in-memory child.
+      if (!knownChildren.isEmpty()) {
+        return remove ? knownChildren.pollFirst() : knownChildren.first();
+      }
+
+      if (lastWatcher != null && !isDirty) {
+        // No children, no known updates, and a watcher is already set; nothing we can do.
+        return null;
       }
-      byte[] bytes = zookeeper.getData(watchID, null, null, true);
-      zookeeper.delete(watchID, -1, true);
-      return new QueueEvent(watchID, bytes, watcher.getWatchedEvent());
+
+      // Try to fetch an updated list of children from ZK.
+      ChildWatcher newWatcher = new ChildWatcher();
+      knownChildren = fetchZkChildren(newWatcher);
+      lastWatcher = newWatcher; // only set after fetchZkChildren returns successfully
+      isDirty = false;
+      if (knownChildren.isEmpty()) {
+        return null;
+      }
+      notEmpty.signalAll();
+      return remove ? knownChildren.pollFirst() : knownChildren.first();
     } finally {
-      time.stop();
+      updateLock.unlock();
     }
   }
-  
+
   /**
-   * Returns the data at the first element of the queue, or null if the queue is
-   * empty.
-   * 
-   * @return data at the first element of the queue, or null.
+   * Return the current set of children from ZK; does not change internal state.
    */
-  public byte[] peek() throws KeeperException, InterruptedException {
-    TimerContext time = stats.time(dir + "_peek");
-    try {
-      QueueEvent element = element();
-      if (element == null) return null;
-      return element.getBytes();
-    } finally {
-      time.stop();
-    }
-  }
-  
-  public List<QueueEvent> peekTopN(int n, Set<String> excludeSet, Long wait)
-      throws KeeperException, InterruptedException {
-    ArrayList<QueueEvent> topN = new ArrayList<>();
-
-    LOG.debug("Peeking for top {} elements. ExcludeSet: " + excludeSet.toString());
-    TimerContext time = null;
-    if (wait == Long.MAX_VALUE) time = stats.time(dir + "_peekTopN_wait_forever");
-    else time = stats.time(dir + "_peekTopN_wait" + wait);
+  TreeSet<String> fetchZkChildren(Watcher watcher) throws InterruptedException, KeeperException {
+    while (true) {
+      try {
+        TreeSet<String> orderedChildren = new TreeSet<>();
 
-    try {
-      Map<Long, String> orderedChildren = getChildren(wait);
-      for (String headNode : orderedChildren.values()) {
-        if (headNode != null && topN.size() < n) {
-          try {
-            String id = dir + "/" + headNode;
-            if (excludeSet != null && excludeSet.contains(id)) continue;
-            QueueEvent queueEvent = new QueueEvent(id,
-                zookeeper.getData(dir + "/" + headNode, null, null, true), null);
-            topN.add(queueEvent);
-          } catch (KeeperException.NoNodeException e) {
-            // Another client removed the node first, try next
-          }
-        } else {
-          if (topN.size() >= 1) {
-            printQueueEventsListElementIds(topN);
-            return topN;
+        List<String> childNames = zookeeper.getChildren(dir, watcher, true);
+        stats.setQueueLength(childNames.size());
+        for (String childName : childNames) {
+          // Check format
+          if (!childName.regionMatches(0, PREFIX, 0, PREFIX.length())) {
+            LOG.debug("Found child node with improper name: " + childName);
+            continue;
           }
+          orderedChildren.add(childName);
         }
+        return orderedChildren;
+      } catch (KeeperException.NoNodeException e) {
+        zookeeper.create(dir, new byte[0], CreateMode.PERSISTENT, true);
+        // go back to the loop and try again
       }
-
-      if (topN.size() > 0 ) {
-        printQueueEventsListElementIds(topN);
-        return topN;
-      }
-      return null;
-    } finally {
-      time.stop();
     }
   }
 
-  private void printQueueEventsListElementIds(ArrayList<QueueEvent> topN) {
-    if(LOG.isDebugEnabled()) {
-      StringBuffer sb = new StringBuffer("[");
-      for(QueueEvent queueEvent: topN) {
-        sb.append(queueEvent.getId()).append(", ");
+  /**
+   * Return the currently-known set of children from memory. If there are no children,
+   * waits up to {@code waitMillis} for at least one child to become available. May
+   * update the set of known children.
+   */
+  SortedSet<String> getChildren(long waitMillis) throws KeeperException, InterruptedException {
+    long waitNanos = TimeUnit.MILLISECONDS.toNanos(waitMillis);
+    while (waitNanos > 0) {
+      // Trigger a fetch if needed.
+      firstElement();
+      updateLock.lockInterruptibly();
+      try {
+        if (!knownChildren.isEmpty()) {
+          return new TreeSet<>(knownChildren);
+        }
+        waitNanos = notEmpty.awaitNanos(waitNanos);
+      } finally {
+        updateLock.unlock();
       }
-      sb.append("]");
-      LOG.debug("Returning topN elements: {}", sb.toString());
     }
+    return ImmutableSortedSet.of();
   }
 
-
   /**
+   * Return the head of the queue without modifying the queue.
    *
-   * Gets last element of the Queue without removing it.
+   * @return the data at the head of the queue.
    */
-  public String getTailId() throws KeeperException, InterruptedException {
-    TreeMap<Long, String> orderedChildren = null;
-    orderedChildren = orderedChildren(null);
-    if(orderedChildren == null || orderedChildren.isEmpty()) return null;
-
-    for(String headNode : orderedChildren.descendingMap().values())
-      if (headNode != null) {
+  private byte[] firstElement() throws KeeperException, InterruptedException {
+    while (true) {
+      String firstChild = firstChild(false);
+      if (firstChild == null) {
+        return null;
+      }
+      try {
+        return zookeeper.getData(dir + "/" + firstChild, null, null, true);
+      } catch (KeeperException.NoNodeException e) {
+        // Another client deleted the node first, remove the in-memory and retry.
+        updateLock.lockInterruptibly();
         try {
-          QueueEvent queueEvent = new QueueEvent(dir + "/" + headNode, zookeeper.getData(dir + "/" + headNode,
-              null, null, true), null);
-          return queueEvent.getId();
-        } catch (KeeperException.NoNodeException e) {
-          // Another client removed the node first, try next
+          knownChildren.remove(firstChild);
+        } finally {
+          updateLock.unlock();
         }
       }
-    return null;
-  }
-  
-  public static class QueueEvent {
-    @Override
-    public int hashCode() {
-      final int prime = 31;
-      int result = 1;
-      result = prime * result + ((id == null) ? 0 : id.hashCode());
-      return result;
-    }
-    
-    @Override
-    public boolean equals(Object obj) {
-      if (this == obj) return true;
-      if (obj == null) return false;
-      if (getClass() != obj.getClass()) return false;
-      QueueEvent other = (QueueEvent) obj;
-      if (id == null) {
-        if (other.id != null) return false;
-      } else if (!id.equals(other.id)) return false;
-      return true;
-    }
-    
-    private WatchedEvent event = null;
-    private String id;
-    private byte[] bytes;
-    
-    QueueEvent(String id, byte[] bytes, WatchedEvent event) {
-      this.id = id;
-      this.bytes = bytes;
-      this.event = event;
-    }
-    
-    public void setId(String id) {
-      this.id = id;
-    }
-    
-    public String getId() {
-      return id;
-    }
-    
-    public void setBytes(byte[] bytes) {
-      this.bytes = bytes;
-    }
-    
-    public byte[] getBytes() {
-      return bytes;
-    }
-    
-    public WatchedEvent getWatchedEvent() {
-      return event;
     }
-    
-  }
-  
-  /**
-   * Returns the data at the first element of the queue, or null if the queue is
-   * empty and block is false.
-   * 
-   * @param block if true, blocks until an element enters the queue
-   * @return data at the first element of the queue, or null.
-   */
-  public QueueEvent peek(boolean block) throws KeeperException, InterruptedException {
-    return peek(block ? Long.MAX_VALUE : 0);
   }
 
-  /**
-   * Returns the data at the first element of the queue, or null if the queue is
-   * empty after wait ms.
-   * 
-   * @param wait max wait time in ms.
-   * @return data at the first element of the queue, or null.
-   */
-  public QueueEvent peek(long wait) throws KeeperException, InterruptedException {
-    TimerContext time = null;
-    if (wait == Long.MAX_VALUE) {
-      time = stats.time(dir + "_peek_wait_forever");
-    } else {
-      time = stats.time(dir + "_peek_wait" + wait);
-    }
-    try {
-      if (wait == 0) {
-        return element();
+  private byte[] removeFirst() throws KeeperException, InterruptedException {
+    while (true) {
+      String firstChild = firstChild(true);
+      if (firstChild == null) {
+        return null;
       }
-
-      Map<Long, String> orderedChildren = getChildren(wait);
-      for (String headNode : orderedChildren.values()) {
-        String path = dir + "/" + headNode;
+      try {
+        String path = dir + "/" + firstChild;
+        byte[] result = zookeeper.getData(path, null, null, true);
+        zookeeper.delete(path, -1, true);
+        return result;
+      } catch (KeeperException.NoNodeException e) {
+        // Another client deleted the node first, remove the in-memory and retry.
+        updateLock.lockInterruptibly();
         try {
-          byte[] data = zookeeper.getData(path, null, null, true);
-          return new QueueEvent(path, data, null);
-        } catch (KeeperException.NoNodeException e) {
-          // Another client deleted the node first.
+          knownChildren.remove(firstChild);
+        } finally {
+          updateLock.unlock();
         }
       }
-      return null;
-    } finally {
-      time.stop();
     }
   }
-  
-  /**
-   * Attempts to remove the head of the queue and return it. Returns null if the
-   * queue is empty.
-   * 
-   * @return Head of the queue or null.
-   */
-  public byte[] poll() throws KeeperException, InterruptedException {
-    TimerContext time = stats.time(dir + "_poll");
+
+  @VisibleForTesting boolean hasWatcher() throws InterruptedException {
+    updateLock.lockInterruptibly();
     try {
-      return remove();
-    } catch (NoSuchElementException e) {
-      return null;
+      return lastWatcher != null;
     } finally {
-      time.stop();
+      updateLock.unlock();
     }
   }
 
-  public Overseer.Stats getStats() {
-    return stats;
+  private class ChildWatcher implements Watcher {
+
+    @Override
+    public void process(WatchedEvent event) {
+      updateLock.lock();
+      try {
+        // this watcher is automatically cleared when fired
+        if (lastWatcher == this) {
+          lastWatcher = null;
+        }
+        // Do no updates in this thread, just signal state back to client threads.
+        isDirty = true;
+        // optimistically signal any waiters that the queue may not be empty now, so they can wake up and retry
+        notEmpty.signalAll();
+      } finally {
+        updateLock.unlock();
+      }
+    }
   }
 }

Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/Overseer.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/Overseer.java?rev=1696789&r1=1696788&r2=1696789&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/Overseer.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/Overseer.java Thu Aug 20 14:05:36 2015
@@ -188,7 +188,7 @@ public class Overseer implements Closeab
             }
           }
 
-          DistributedQueue.QueueEvent head = null;
+          byte[] head = null;
           try {
             head = stateUpdateQueue.peek(true);
           } catch (KeeperException e) {
@@ -207,8 +207,8 @@ public class Overseer implements Closeab
           }
           try {
             while (head != null) {
-              final byte[] data = head.getBytes();
-              final ZkNodeProps message = ZkNodeProps.load(head.getBytes());
+              final byte[] data = head;
+              final ZkNodeProps message = ZkNodeProps.load(data);
               log.info("processMessage: queueSize: {}, message = {} current state version: {}", stateUpdateQueue.getStats().getQueueLength(), message, clusterState.getZkClusterStateVersion());
               // we can batch here because workQueue is our fallback in case a ZK write failed
               clusterState = processQueueItem(message, clusterState, zkStateWriter, true, new ZkStateWriter.ZkWriteCallback() {
@@ -922,13 +922,13 @@ public class Overseer implements Closeab
   }
   
   /* Collection creation queue */
-  static DistributedQueue getCollectionQueue(final SolrZkClient zkClient) {
+  static OverseerCollectionQueue getCollectionQueue(final SolrZkClient zkClient) {
     return getCollectionQueue(zkClient, new Stats());
   }
 
-  static DistributedQueue getCollectionQueue(final SolrZkClient zkClient, Stats zkStats)  {
+  static OverseerCollectionQueue getCollectionQueue(final SolrZkClient zkClient, Stats zkStats)  {
     createOverseerNode(zkClient);
-    return new DistributedQueue(zkClient, "/overseer/collection-queue-work", zkStats);
+    return new OverseerCollectionQueue(zkClient, "/overseer/collection-queue-work", zkStats);
   }
   
   private static void createOverseerNode(final SolrZkClient zkClient) {

Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java?rev=1696789&r1=1696788&r2=1696789&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionProcessor.java Thu Aug 20 14:05:36 2015
@@ -53,7 +53,7 @@ public class OverseerCollectionProcessor
                                         Overseer.Stats stats,
                                         Overseer overseer,
                                         OverseerNodePrioritizer overseerNodePrioritizer,
-                                        DistributedQueue workQueue,
+                                        OverseerCollectionQueue workQueue,
                                         DistributedMap runningMap,
                                         DistributedMap completedMap,
                                         DistributedMap failureMap) {

Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/OverseerProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/OverseerProcessor.java?rev=1696789&r1=1696788&r2=1696789&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/OverseerProcessor.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/OverseerProcessor.java Thu Aug 20 14:05:36 2015
@@ -29,9 +29,8 @@ import java.util.concurrent.SynchronousQ
 import java.util.concurrent.TimeUnit;
 
 import org.apache.solr.client.solrj.SolrResponse;
-import org.apache.solr.cloud.DistributedQueue.QueueEvent;
+import org.apache.solr.cloud.OverseerCollectionQueue.QueueEvent;
 import org.apache.solr.cloud.Overseer.LeaderStatus;
-import org.apache.solr.cloud.overseer.OverseerAction;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkNodeProps;
@@ -66,7 +65,7 @@ public class OverseerProcessor implement
   private static Logger log = LoggerFactory
       .getLogger(OverseerProcessor.class);
 
-  private DistributedQueue workQueue;
+  private OverseerCollectionQueue workQueue;
   private DistributedMap runningMap;
   private DistributedMap completedMap;
   private DistributedMap failureMap;
@@ -105,7 +104,7 @@ public class OverseerProcessor implement
                                         Overseer.Stats stats,
                                         OverseerMessageHandlerSelector selector,
                                         OverseerNodePrioritizer prioritizer,
-                                        DistributedQueue workQueue,
+                                        OverseerCollectionQueue workQueue,
                                         DistributedMap runningMap,
                                         DistributedMap completedMap,
                                         DistributedMap failureMap) {

Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/ZkController.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/ZkController.java?rev=1696789&r1=1696788&r2=1696789&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/ZkController.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/cloud/ZkController.java Thu Aug 20 14:05:36 2015
@@ -122,7 +122,7 @@ public final class ZkController {
   private final boolean SKIP_AUTO_RECOVERY = Boolean.getBoolean("solrcloud.skip.autorecovery");
   
   private final DistributedQueue overseerJobQueue;
-  private final DistributedQueue overseerCollectionQueue;
+  private final OverseerCollectionQueue overseerCollectionQueue;
 
   private final DistributedMap overseerRunningMap;
   private final DistributedMap overseerCompletedMap;
@@ -1776,7 +1776,7 @@ public final class ZkController {
     return overseerJobQueue;
   }
 
-  public DistributedQueue getOverseerCollectionQueue() {
+  public OverseerCollectionQueue getOverseerCollectionQueue() {
     return overseerCollectionQueue;
   }
 

Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java?rev=1696789&r1=1696788&r2=1696789&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java Thu Aug 20 14:05:36 2015
@@ -36,8 +36,8 @@ import org.apache.solr.client.solrj.impl
 import org.apache.solr.client.solrj.request.CoreAdminRequest;
 import org.apache.solr.client.solrj.request.CoreAdminRequest.RequestSyncShard;
 import org.apache.solr.cloud.DistributedMap;
-import org.apache.solr.cloud.DistributedQueue;
-import org.apache.solr.cloud.DistributedQueue.QueueEvent;
+import org.apache.solr.cloud.OverseerCollectionQueue;
+import org.apache.solr.cloud.OverseerCollectionQueue.QueueEvent;
 import org.apache.solr.cloud.Overseer;
 import org.apache.solr.cloud.OverseerSolrResponse;
 import org.apache.solr.cloud.overseer.SliceMutator;
@@ -252,7 +252,7 @@ public class CollectionsHandler extends
   }
 
   private boolean overseerCollectionQueueContains(String asyncId) throws KeeperException, InterruptedException {
-    DistributedQueue collectionQueue = coreContainer.getZkController().getOverseerCollectionQueue();
+    OverseerCollectionQueue collectionQueue = coreContainer.getZkController().getOverseerCollectionQueue();
     return collectionQueue.containsTaskWithRequestId(ASYNC, asyncId);
   }
 

Modified: lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java?rev=1696789&r1=1696788&r2=1696789&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java Thu Aug 20 14:05:36 2015
@@ -21,8 +21,8 @@ import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.CharsRefBuilder;
 import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.cloud.CloudDescriptor;
-import org.apache.solr.cloud.DistributedQueue;
 import org.apache.solr.cloud.LeaderInitiatedRecoveryThread;
+import org.apache.solr.cloud.DistributedQueue;
 import org.apache.solr.cloud.Overseer;
 import org.apache.solr.cloud.ZkController;
 import org.apache.solr.cloud.overseer.OverseerAction;

Modified: lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java?rev=1696789&r1=1696788&r2=1696789&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/DistributedQueueTest.java Thu Aug 20 14:05:36 2015
@@ -16,13 +16,19 @@ package org.apache.solr.cloud;
  * the License.
  */
 
-import java.io.File;
 import java.nio.charset.Charset;
+import java.util.NoSuchElementException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.solr.SolrTestCaseJ4;
 import org.apache.solr.common.cloud.SolrZkClient;
-import org.apache.solr.cloud.DistributedQueue.QueueEvent;
-
+import org.apache.solr.common.util.ExecutorUtil;
+import org.apache.solr.common.util.SolrjNamedThreadFactory;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -33,6 +39,7 @@ public class DistributedQueueTest extend
 
   protected ZkTestServer zkServer;
   protected SolrZkClient zkClient;
+  protected ExecutorService executor = ExecutorUtil.newMDCAwareSingleThreadExecutor(new SolrjNamedThreadFactory("dqtest-"));
 
   @Before
   @Override
@@ -44,37 +51,120 @@ public class DistributedQueueTest extend
   @Test
   public void testDistributedQueue() throws Exception {
     String dqZNode = "/distqueue/test";
-    String testData = "hello world";
-    long timeoutMs = 500L;
+    byte[] data = "hello world".getBytes(UTF8);
 
-    DistributedQueue dq = new DistributedQueue(zkClient, setupDistributedQueueZNode(dqZNode));
+    DistributedQueue dq = makeDistributedQueue(dqZNode);
 
     // basic ops
-    assertTrue(dq.poll() == null);
-    byte[] data = testData.getBytes(UTF8);
+    assertNull(dq.poll());
+    try {
+      dq.remove();
+      fail("NoSuchElementException expected");
+    } catch (NoSuchElementException expected) {
+      // expected
+    }
+
+    dq.offer(data);
+    assertArrayEquals(dq.peek(500), data);
+    assertArrayEquals(dq.remove(), data);
+    assertNull(dq.poll());
+
     dq.offer(data);
-    assertEquals(new String(dq.peek(),UTF8), testData);
-    assertEquals(new String(dq.take(),UTF8), testData);
-    assertTrue(dq.poll() == null);
-    QueueEvent qe = dq.offer(data, timeoutMs);
-    assertNotNull(qe);
-    assertEquals(new String(dq.remove(),UTF8), testData);
+    assertArrayEquals(dq.take(), data); // waits for data
+    assertNull(dq.poll());
+
+    dq.offer(data);
+    dq.peek(true); // wait until data is definitely there before calling remove
+    assertArrayEquals(dq.remove(), data);
+    assertNull(dq.poll());
 
     // should block until the background thread makes the offer
     (new QueueChangerThread(dq, 1000)).start();
-    qe = dq.peek(true);
-    assertNotNull(qe);
-    dq.remove();
+    assertNotNull(dq.peek(true));
+    assertNotNull(dq.remove());
+    assertNull(dq.poll());
 
     // timeout scenario ... background thread won't offer until long after the peek times out
     QueueChangerThread qct = new QueueChangerThread(dq, 1000);
     qct.start();
-    qe = dq.peek(500);
-    assertTrue(qe == null);
+    assertNull(dq.peek(500));
+    qct.join();
+  }
+
+  @Test
+  public void testDistributedQueueBlocking() throws Exception {
+    String dqZNode = "/distqueue/test";
+    String testData = "hello world";
+
+    final DistributedQueue dq = makeDistributedQueue(dqZNode);
 
+    assertNull(dq.peek());
+    Future<String> future = executor.submit(new Callable<String>() {
+      @Override
+      public String call() throws Exception {
+        return new String(dq.peek(true), UTF8);
+      }
+    });
     try {
-      qct.interrupt();
-    } catch (Exception exc) {}
+      future.get(1000, TimeUnit.MILLISECONDS);
+      fail("TimeoutException expected");
+    } catch (TimeoutException expected) {
+      assertFalse(future.isDone());
+    }
+
+    // Ultimately trips the watcher, triggering child refresh
+    dq.offer(testData.getBytes(UTF8));
+    assertEquals(testData, future.get(1000, TimeUnit.MILLISECONDS));
+    assertNotNull(dq.poll());
+
+    // After draining the queue, a watcher should be set.
+    assertNull(dq.peek(100));
+    assertTrue(dq.hasWatcher());
+
+    forceSessionExpire();
+
+    // Session expiry should have fired the watcher.
+    Thread.sleep(100);
+    assertFalse(dq.hasWatcher());
+
+    // Rerun the earlier test make sure updates are still seen, post reconnection.
+    future = executor.submit(new Callable<String>() {
+      @Override
+      public String call() throws Exception {
+        return new String(dq.peek(true), UTF8);
+      }
+    });
+    try {
+      future.get(1000, TimeUnit.MILLISECONDS);
+      fail("TimeoutException expected");
+    } catch (TimeoutException expected) {
+      assertFalse(future.isDone());
+    }
+
+    // Ultimately trips the watcher, triggering child refresh
+    dq.offer(testData.getBytes(UTF8));
+    assertEquals(testData, future.get(1000, TimeUnit.MILLISECONDS));
+    assertNotNull(dq.poll());
+    assertNull(dq.poll());
+  }
+
+  private void forceSessionExpire() throws InterruptedException, TimeoutException {
+    long sessionId = zkClient.getSolrZooKeeper().getSessionId();
+    zkServer.expire(sessionId);
+    zkClient.getConnectionManager().waitForDisconnected(10000);
+    zkClient.getConnectionManager().waitForConnected(10000);
+    for (int i = 0; i < 100; ++i) {
+      if (zkClient.isConnected()) {
+        break;
+      }
+      Thread.sleep(50);
+    }
+    assertTrue(zkClient.isConnected());
+    assertFalse(sessionId == zkClient.getSolrZooKeeper().getSessionId());
+  }
+
+  protected DistributedQueue makeDistributedQueue(String dqZNode) throws Exception {
+    return new DistributedQueue(zkClient, setupNewDistributedQueueZNode(dqZNode));
   }
 
   private class QueueChangerThread extends Thread {
@@ -99,7 +189,7 @@ public class DistributedQueueTest extend
     }
   }
 
-  protected String setupDistributedQueueZNode(String znodePath) throws Exception {
+  protected String setupNewDistributedQueueZNode(String znodePath) throws Exception {
     if (!zkClient.exists("/", true))
       zkClient.makePath("/", false, true);
     if (zkClient.exists(znodePath, true))
@@ -113,8 +203,10 @@ public class DistributedQueueTest extend
   public void tearDown() throws Exception {
     try {
       super.tearDown();
-    } catch (Exception exc) {}
+    } catch (Exception exc) {
+    }
     closeZk();
+    executor.shutdown();
   }
 
   protected void setupZk() throws Exception {

Modified: lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionProcessorTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionProcessorTest.java?rev=1696789&r1=1696788&r2=1696789&view=diff
==============================================================================
--- lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionProcessorTest.java (original)
+++ lucene/dev/branches/branch_5x/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionProcessorTest.java Thu Aug 20 14:05:36 2015
@@ -20,7 +20,7 @@ package org.apache.solr.cloud;
 import org.apache.solr.SolrTestCaseJ4;
 import org.apache.solr.client.solrj.SolrResponse;
 import org.apache.solr.client.solrj.response.QueryResponse;
-import org.apache.solr.cloud.DistributedQueue.QueueEvent;
+import org.apache.solr.cloud.OverseerCollectionQueue.QueueEvent;
 import org.apache.solr.cloud.Overseer.LeaderStatus;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.SolrZkClient;
@@ -79,7 +79,7 @@ public class OverseerCollectionProcessor
   private static final String COLLECTION_NAME = "mycollection";
   private static final String CONFIG_NAME = "myconfig";
   
-  private static DistributedQueue workQueueMock;
+  private static OverseerCollectionQueue workQueueMock;
   private static DistributedMap runningMapMock;
   private static DistributedMap completedMapMock;
   private static DistributedMap failureMapMock;
@@ -105,7 +105,7 @@ public class OverseerCollectionProcessor
     public OverseerCollectionProcessorToBeTested(ZkStateReader zkStateReader,
         String myId, ShardHandlerFactory shardHandlerFactory,
         String adminPath,
-        DistributedQueue workQueue, DistributedMap runningMap,
+        OverseerCollectionQueue workQueue, DistributedMap runningMap,
         DistributedMap completedMap,
         DistributedMap failureMap) {
       super(zkStateReader, myId, shardHandlerFactory, adminPath, new Overseer.Stats(), null, new OverseerNodePrioritizer(zkStateReader, adminPath, shardHandlerFactory), workQueue, runningMap, completedMap, failureMap);
@@ -120,7 +120,7 @@ public class OverseerCollectionProcessor
   
   @BeforeClass
   public static void setUpOnce() throws Exception {
-    workQueueMock = createMock(DistributedQueue.class);
+    workQueueMock = createMock(OverseerCollectionQueue.class);
     runningMapMock = createMock(DistributedMap.class);
     completedMapMock = createMock(DistributedMap.class);
     failureMapMock = createMock(DistributedMap.class);