You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/10/31 04:10:32 UTC

[lucene-solr] branch reference_impl_dev updated: @1096 Go Overseer, plus ultra.

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/reference_impl_dev by this push:
     new 11fc487  @1096 Go Overseer, plus ultra.
11fc487 is described below

commit 11fc487d6107a90586f95476940e92c9c5b24792
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Fri Oct 30 23:10:09 2020 -0500

    @1096 Go Overseer, plus ultra.
---
 .../src/java/org/apache/solr/cloud/Overseer.java   |   7 +-
 .../apache/solr/cloud/OverseerTaskProcessor.java   |  31 +-
 .../org/apache/solr/cloud/OverseerTaskQueue.java   | 108 ++++--
 .../org/apache/solr/cloud/ZkDistributedQueue.java  | 394 ++++++++++++++-------
 .../org/apache/solr/common/cloud/SolrZkClient.java |  38 ++
 5 files changed, 415 insertions(+), 163 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index a277a42..0fc4350 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -17,6 +17,7 @@
 package org.apache.solr.cloud;
 
 import java.io.Closeable;
+import java.io.File;
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.util.Collection;
@@ -315,7 +316,7 @@ public class Overseer implements SolrCloseable {
             // We do not need to filter any nodes here cause all processed nodes are removed once we flush clusterstate
 
             long wait = 5000;
-            queue = new LinkedList<>(stateUpdateQueue.peekElements(1000, wait, (x) -> false));
+            queue = new LinkedList<>(stateUpdateQueue.peekElements(1000, wait, (x) -> x.startsWith(OverseerTaskQueue.RESPONSE_PREFIX)));
           } catch (AlreadyClosedException e) {
             if (isClosed()) {
               log.info("Overseer closed (AlreadyClosedException), exiting loop");
@@ -350,7 +351,7 @@ public class Overseer implements SolrCloseable {
                 final ZkNodeProps message = ZkNodeProps.load(data);
                 if (log.isDebugEnabled()) log.debug("processMessage: queueSize: {}, message = {}", stateUpdateQueue.getZkStats().getQueueLength(), message);
                 if (log.isDebugEnabled()) log.debug("add processed node: {}, processedNodes = {}", head.first(), stateUpdateQueue.getZkStats().getQueueLength(), processedNodes);
-                processedNodes.add(head.first());
+                processedNodes.add(new File(head.first()).getName());
                 // The callback always be called on this thread
                 processQueueItem(message, reader.getClusterState(), zkStateWriter, true, null);
               }
@@ -361,7 +362,7 @@ public class Overseer implements SolrCloseable {
               // if an event comes in the next *ms batch it together
               int wait = 10;
               if (log.isDebugEnabled()) log.debug("going to peekElements processedNodes={}", processedNodes);
-              queue = new LinkedList<>(stateUpdateQueue.peekElements(10, wait, node -> processedNodes.contains(node)));
+              queue = new LinkedList<>(stateUpdateQueue.peekElements(10, wait, node -> processedNodes.contains(node) || node.startsWith(OverseerTaskQueue.RESPONSE_PREFIX)));
             }
             fallbackQueueSize = processedNodes.size();
             // we should force write all pending updates because the next iteration might sleep until there
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
index 6888db1..5c0f33c 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
@@ -100,10 +100,14 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
   // This is an optimization to ensure that we do not read the same tasks
   // again and again from ZK.
   final private Map<String, QueueEvent> blockedTasks = new ConcurrentSkipListMap<>();
-  final private Predicate<String> excludedTasks = new Predicate<String>() {
+  final private Predicate<String> excludedTasks = new Predicate<>() {
     @Override
     public boolean test(String s) {
       // nocommit
+      if (s.startsWith(OverseerTaskQueue.RESPONSE_PREFIX)) {
+        if (log.isDebugEnabled()) log.debug("exclude {} due to prefix {}", s, OverseerTaskQueue.RESPONSE_PREFIX);
+        return true;
+      }
 
       boolean contains = runningTasks.contains(s) || blockedTasks.containsKey(s) || runningZKTasks.contains(s);
       if (log.isDebugEnabled()) log.debug("test {} against {}, {}, {}  : {}", s, runningTasks, blockedTasks, runningZKTasks, contains);
@@ -220,17 +224,22 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
 
         for (QueueEvent head : heads) {
 
-//          if (runningZKTasks.contains(head.getId())) {
-//            log.warn("Task found in running ZKTasks already, continuing");
-//            continue;
-//          }
+          if (runningZKTasks.contains(head.getId())) {
+            log.warn("Task found in running ZKTasks already, continuing");
+            continue;
+          }
+
+          if (head.getBytes() == null) {
+            log.info("Found empty entry id={} event={}", head.getId(), head.getWatchedEvent());
+            continue;
+          }
 
           final ZkNodeProps message = ZkNodeProps.load(head.getBytes());
           final String asyncId = message.getStr(ASYNC);
           if (hasLeftOverItems) {
             if (head.getId().equals(oldestItemInWorkQueue)) hasLeftOverItems = false;
             if (asyncId != null && (completedMap.contains(asyncId) || failureMap.contains(asyncId))) {
-              log.debug("Found already processed task in workQueue, cleaning up. AsyncId [{}]", asyncId);
+              if (log.isDebugEnabled()) log.debug("Found already processed task in workQueue, cleaning up. AsyncId [{}]", asyncId);
               workQueue.remove(head);
               continue;
             }
@@ -294,9 +303,10 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
     Set<Map.Entry<String, QueueEvent>> entrySet = completedTasks.entrySet();
     AtomicBoolean sessionExpired = new AtomicBoolean();
     AtomicBoolean interrupted = new AtomicBoolean();
+    // TODO: async
     try (ParWork work = new ParWork(this, true, true)) {
       for (Map.Entry<String, QueueEvent> entry : entrySet) {
-        work.collect("cleanWorkQueue", ()->{
+        work.collect("cleanWorkQueue", () -> {
           try {
             workQueue.remove(entry.getValue());
           } catch (KeeperException.SessionExpiredException e) {
@@ -304,11 +314,12 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
           } catch (InterruptedException e) {
             interrupted.set(true);
           } catch (KeeperException e) {
-           log.error("Exception removing item from workQueue", e);
+            log.error("Exception removing item from workQueue", e);
           }
-          runningTasks.remove(entry.getKey());});
-          completedTasks.remove(entry.getKey());
+          runningTasks.remove(entry.getKey());
+        });
       }
+
     }
 
     if (interrupted.get()) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
index 9078f3c..38c019e 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskQueue.java
@@ -16,9 +16,13 @@
  */
 package org.apache.solr.cloud;
 
+import java.io.File;
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.LongAdder;
@@ -48,7 +52,7 @@ import org.slf4j.LoggerFactory;
 public class OverseerTaskQueue extends ZkDistributedQueue {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-  private static final String RESPONSE_PREFIX = "qnr-" ;
+  public static final String RESPONSE_PREFIX = "qnr-" ;
 
   private final LongAdder pendingResponses = new LongAdder();
 
@@ -76,28 +80,47 @@ public class OverseerTaskQueue extends ZkDistributedQueue {
    */
   public boolean containsTaskWithRequestId(String requestIdKey, String requestId)
       throws KeeperException, InterruptedException {
+    Set<String> childNames;
+        updateLock.lockInterruptibly();
+    try {
 
-    List<String> childNames = zookeeper.getChildren(dir, null, true);
-    stats.setQueueLength(childNames.size());
-    for (String childName : childNames) {
-      if (childName != null && childName.startsWith(PREFIX)) {
-        try {
-          byte[] data = zookeeper.getData(dir + "/" + childName, null, null, true);
-          if (data != null) {
-            ZkNodeProps message = ZkNodeProps.load(data);
-            if (message.containsKey(requestIdKey)) {
-              if (log.isDebugEnabled()) {
-                log.debug("Looking for {}, found {}", message.get(requestIdKey), requestId);
+     childNames = knownChildren.keySet();
+
+
+      stats.setQueueLength(childNames.size());
+      for (String childName : childNames) {
+        if (childName != null && childName.startsWith(PREFIX)) {
+          try {
+            byte[] data = knownChildren.get(childName);
+            if (data == null) {
+              data = zookeeper.getData(dir + "/" + childName, null, null, true);
+              if (data != null) {
+                knownChildren.put(childName, data);
+              }
+            }
+            if (data != null) {
+              ZkNodeProps message = ZkNodeProps.load(data);
+              if (message.containsKey(requestIdKey)) {
+                if (log.isDebugEnabled()) {
+                  log.debug("Looking for {}, found {}", message.get(requestIdKey), requestId);
+                }
+                if(message.get(requestIdKey).equals(requestId)) return true;
               }
-              if(message.get(requestIdKey).equals(requestId)) return true;
             }
+          } catch (KeeperException.NoNodeException e) {
+            knownChildren.remove(childName);
           }
-        } catch (KeeperException.NoNodeException e) {
-          // Another client removed the node first, try next
         }
       }
+
+    } finally {
+      if (updateLock.isHeldByCurrentThread()) {
+        updateLock.unlock();
+      }
     }
 
+
+
     return false;
   }
 
@@ -109,8 +132,9 @@ public class OverseerTaskQueue extends ZkDistributedQueue {
     Timer.Context time = stats.time(dir + "_remove_event");
     try {
       String path = dir + "/" + event.getId();
-      String responsePath = dir + "/" + RESPONSE_PREFIX
+      String responseId = RESPONSE_PREFIX
           + path.substring(path.lastIndexOf("-") + 1);
+      String responsePath = dir + "/" + responseId;
 
       try {
         zookeeper.setData(responsePath, event.getBytes(), true);
@@ -122,6 +146,16 @@ public class OverseerTaskQueue extends ZkDistributedQueue {
         zookeeper.delete(path, -1, true);
       } catch (KeeperException.NoNodeException ignored) {
       }
+
+      updateLock.lockInterruptibly();
+      try {
+        knownChildren.remove(event.getId());
+        knownChildren.put(responseId, event.getBytes());
+      } finally {
+        if (updateLock.isHeldByCurrentThread()) {
+          updateLock.unlock();
+        }
+      }
     } finally {
       time.stop();
     }
@@ -277,7 +311,7 @@ public class OverseerTaskQueue extends ZkDistributedQueue {
     try {
       for (Pair<String, byte[]> element : peekElements(n, waitMillis, excludeSet)) {
         if (log.isDebugEnabled()) log.debug("Add to topN {}", dir + "/" + element.first());
-        topN.add(new QueueEvent(element.first(),
+        topN.add(new QueueEvent(new File(element.first()).getName(),
             element.second(), null));
       }
       printQueueEventsListElementIds(topN);
@@ -305,16 +339,44 @@ public class OverseerTaskQueue extends ZkDistributedQueue {
    */
   public String getTailId() throws KeeperException, InterruptedException {
     // TODO: could we use getChildren here?  Unsure what freshness guarantee the caller needs.
-    TreeSet<String> orderedChildren = fetchZkChildren(null, null);
+    TreeMap<String,byte[]> orderedChildren = fetchZkChildren(null, null);
 
-    for (String headNode : orderedChildren.descendingSet())
+    for (Map.Entry<String,byte[]>  headNode : orderedChildren.entrySet())
       if (headNode != null) {
         try {
-          QueueEvent queueEvent = new QueueEvent(headNode, zookeeper.getData(dir + "/" + headNode,
-              null, null, true), null);
+          byte[] data;
+          updateLock.lockInterruptibly();
+          try {
+            data = knownChildren.get(headNode.getKey());
+          } finally {
+            if (updateLock.isHeldByCurrentThread()) {
+              updateLock.unlock();
+            }
+          }
+          if (data == null) {
+            data = zookeeper.getData(dir + "/" + headNode.getKey(), null, null, true);
+          }
+          QueueEvent queueEvent = new QueueEvent(headNode.getKey(), data, null);
+
+          updateLock.lockInterruptibly();
+          try {
+            knownChildren.put(headNode.getKey(), data);
+          } finally {
+            if (updateLock.isHeldByCurrentThread()) {
+              updateLock.unlock();
+            }
+          }
+
           return queueEvent.getId();
         } catch (KeeperException.NoNodeException e) {
-          // Another client removed the node first, try next
+          updateLock.lockInterruptibly();
+          try {
+            knownChildren.remove(headNode.getKey());
+          } finally {
+            if (updateLock.isHeldByCurrentThread()) {
+              updateLock.unlock();
+            }
+          }
         }
       }
     return null;
@@ -346,7 +408,7 @@ public class OverseerTaskQueue extends ZkDistributedQueue {
     private volatile  byte[] bytes;
 
     QueueEvent(String id, byte[] bytes, WatchedEvent event) {
-      if (log.isDebugEnabled()) log.debug("Create QueueEvent with id {}", id);
+      if (log.isDebugEnabled()) log.debug("Create QueueEvent with id {} {} {}", id, bytes != null ? bytes.length : 0, event);
       this.id = id;
       this.bytes = bytes;
       this.event = event;
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java b/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java
index b29768c..140443e 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java
@@ -16,15 +16,18 @@
  */
 package org.apache.solr.cloud;
 
+import java.io.File;
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Set;
+import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -98,7 +101,7 @@ public class ZkDistributedQueue implements DistributedQueue {
    * Therefore, methods like {@link #peek()} have to double-check actual node existence, and methods
    * like {@link #poll()} must resolve any races by attempting to delete the underlying node.
    */
-  protected volatile TreeSet<String> knownChildren;
+  protected volatile TreeMap<String,byte[]> knownChildren;
 
   /**
    * Used to wait on ZK changes to the child list; you must hold {@link #updateLock} before waiting on this condition.
@@ -134,14 +137,14 @@ public class ZkDistributedQueue implements DistributedQueue {
     try {
       try {
         updateLock.lockInterruptibly();
-        knownChildren = fetchZkChildren(null, null);
+        fetchZkChildren(null, null);
       } catch (KeeperException e) {
         throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
       } catch (InterruptedException e) {
         ParWork.propagateInterrupt(e);
         throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
       }
-    }finally {
+    } finally {
       if (updateLock.isHeldByCurrentThread()) {
         updateLock.unlock();
       }
@@ -186,39 +189,38 @@ public class ZkDistributedQueue implements DistributedQueue {
   @Override
   public byte[] peek(long wait) throws KeeperException, InterruptedException {
     byte[] result = null;
-    try {
-      Timer.Context time;
-      if (wait == Long.MAX_VALUE) {
-        time = stats.time(dir + "_peek_wait_forever");
-      } else {
-        time = stats.time(dir + "_peek_wait" + wait);
-      }
 
-      long waitNanos = TimeUnit.MILLISECONDS.toNanos(wait);
+    Timer.Context time;
+    if (wait == Long.MAX_VALUE) {
+      time = stats.time(dir + "_peek_wait_forever");
+    } else {
+      time = stats.time(dir + "_peek_wait" + wait);
+    }
 
-      result = firstElement();
-      if (result != null) {
-        return result;
-      }
+    long waitNanos = TimeUnit.MILLISECONDS.toNanos(wait);
 
-      ChildWatcher watcher = new ChildWatcher();
-      TreeSet<String> foundChildren = fetchZkChildren(watcher, null);
+    result = firstElement();
+    if (result != null) {
+      return result;
+    }
 
-      TimeOut timeout = new TimeOut(waitNanos, TimeUnit.NANOSECONDS, TimeSource.NANO_TIME);
+    ChildWatcher watcher = new ChildWatcher();
+    TreeMap<String,byte[]> foundChildren = fetchZkChildren(watcher, null);
 
-      waitForChildren(null, foundChildren, waitNanos, timeout, watcher);
-      if (foundChildren.size() == 0) {
-        return null;
-      }
+    if (foundChildren.size() > 0) {
       result = firstElement();
       return result;
-    } finally {
-      try {
-        zookeeper.getSolrZooKeeper().removeAllWatches(dir, Watcher.WatcherType.Children, false);
-      } catch (Exception e) {
-        log.info(e.getMessage());
-      }
     }
+
+    TimeOut timeout = new TimeOut(waitNanos, TimeUnit.NANOSECONDS, TimeSource.NANO_TIME);
+
+    waitForChildren(null, foundChildren, waitNanos, timeout, watcher);
+    if (foundChildren.size() == 0) {
+      return null;
+    }
+    result = firstElement();
+    return result;
+
   }
 
   /**
@@ -256,9 +258,22 @@ public class ZkDistributedQueue implements DistributedQueue {
     }
   }
 
+  // TODO: use async
   public void remove(Collection<String> paths) throws KeeperException, InterruptedException {
     if (log.isDebugEnabled()) log.debug("Remove paths from queue {} {}", dir, paths);
     if (paths.isEmpty()) return;
+
+    updateLock.lockInterruptibly();
+    try {
+      for (String path : paths) {
+        knownChildren.remove(path);
+      }
+    } finally {
+      if (updateLock.isHeldByCurrentThread()) {
+        updateLock.unlock();
+      }
+    }
+
     List<Op> ops = new ArrayList<>();
     for (String path : paths) {
       ops.add(Op.delete(dir + "/" + path, -1));
@@ -283,16 +298,6 @@ public class ZkDistributedQueue implements DistributedQueue {
         }
       }
     }
-    updateLock.lockInterruptibly();
-    try {
-      for (String path : paths) {
-        knownChildren.remove(path);
-      }
-    } finally {
-      if (updateLock.isHeldByCurrentThread()) {
-        updateLock.unlock();
-      }
-    }
   }
 
   /**
@@ -314,7 +319,7 @@ public class ZkDistributedQueue implements DistributedQueue {
       }
 
       ChildWatcher watcher = new ChildWatcher();
-      TreeSet<String> foundChildren = fetchZkChildren(watcher, null);
+      TreeMap<String,byte[]> foundChildren = fetchZkChildren(watcher, null);
 
       TimeOut timeout = new TimeOut(waitNanos, TimeUnit.NANOSECONDS, TimeSource.NANO_TIME);
 
@@ -410,18 +415,17 @@ public class ZkDistributedQueue implements DistributedQueue {
 
   /**
    * Returns the name if the first known child node, or {@code null} if the queue is empty.
-   * This is the only place {@link #knownChildren} is ever updated!
-   * The caller must double check that the actual node still exists, since the in-memory
-   * list is inherently stale.
+   * @return
    */
-  private String firstChild(boolean remove) {
+  private Map.Entry<String,byte[]> firstChild(boolean remove) {
     try {
       updateLock.lockInterruptibly();
       try {
         // We always return from cache first, the cache will be cleared if the node is not exist
         if (!knownChildren.isEmpty()) {
-          return remove ? knownChildren.pollFirst() : knownChildren.first();
+          return remove ? knownChildren.pollFirstEntry() : knownChildren.firstEntry();
         }
+
         return null;
       } finally {
         if (updateLock.isHeldByCurrentThread()) {
@@ -437,13 +441,47 @@ public class ZkDistributedQueue implements DistributedQueue {
   /**
    * Return the current set of children from ZK; does not change internal state.
    */
-  TreeSet<String> fetchZkChildren(Watcher watcher, Predicate<String> acceptFilter) throws KeeperException {
-    TreeSet<String> orderedChildren = new TreeSet<>();
+  TreeMap<String,byte[]> fetchZkChildren(Watcher watcher, Predicate<String> acceptFilter) throws KeeperException, InterruptedException {
+
+    TreeMap<String,byte[]> orderedChildren = new TreeMap<>();
+    updateLock.lockInterruptibly();
+    try {
+      if (knownChildren != null && !knownChildren.isEmpty()) {
+        Set<Map.Entry<String,byte[]>> entrySet = knownChildren.entrySet();
+        for (Map.Entry<String,byte[]> entry : entrySet) {
+          String childName = entry.getKey();
+          // Check format
+          if (!childName.startsWith(PREFIX)) {
+
+            // responses can be written to same queue with different naming scheme
+            if (log.isDebugEnabled()) log.debug("Found child node with improper name: {}, prefix={}", childName, PREFIX);
+            continue;
+          }
+          if (acceptFilter != null && acceptFilter.test(childName)) {
+            if (log.isDebugEnabled()) log.debug("Found child that matched exclude filter: {}", dir + "/" + childName);
+            continue;
+          }
+
+          orderedChildren.put(childName, entry.getValue());
+        }
+      }
+    } finally {
+      if (updateLock.isHeldByCurrentThread()) {
+        updateLock.unlock();
+      }
+    }
+
+    if (!orderedChildren.isEmpty()) {
+      return orderedChildren;
+    }
+
+    TreeMap<String,byte[]> remoteKnownChildren = new TreeMap<>();
     try {
       List<String> childNames = zookeeper.getChildren(dir, watcher, true);
       stats.setQueueLength(childNames.size());
       for (String childName : childNames) {
         if (log.isDebugEnabled()) log.debug("Examine child: {} out of {} {}", childName, childNames.size(), acceptFilter);
+        remoteKnownChildren.put(childName, null);
         // Check format
         if (!childName.regionMatches(0, PREFIX, 0, PREFIX.length())) {
 
@@ -456,8 +494,9 @@ public class ZkDistributedQueue implements DistributedQueue {
           continue;
         }
         if (log.isDebugEnabled()) log.debug("Add child to fetched children: {}", childName);
-        orderedChildren.add(childName);
+        orderedChildren.put(childName, null);
       }
+      updateKnownChildren(remoteKnownChildren);
       return orderedChildren;
     } catch (InterruptedException e) {
       ParWork.propagateInterrupt(e);
@@ -465,6 +504,28 @@ public class ZkDistributedQueue implements DistributedQueue {
     }
   }
 
+  private void updateKnownChildren(TreeMap<String,byte[]> children) {
+    updateLock.lock();
+    try {
+      Set<Map.Entry<String,byte[]>> entrySet = children.entrySet();
+      for (Map.Entry<String,byte[]> entry : entrySet) {
+        String childName = entry.getKey();
+        byte[] data = knownChildren == null ? null : knownChildren.get(childName);
+        if (data != null) {
+          if (childName.startsWith("/")) {
+            throw new IllegalArgumentException(childName);
+          }
+          children.put(childName, data);
+        }
+      }
+      knownChildren = children;
+    } catch (Exception e) {
+      log.error("", e);
+    } finally {
+      updateLock.unlock();
+    }
+  }
+
   /**
    * Return the currently-known set of elements, using child names from memory. If no children are found, or no
    * children pass {@code acceptFilter}, waits up to {@code waitMillis} for at least one child to become available.
@@ -472,11 +533,11 @@ public class ZkDistributedQueue implements DistributedQueue {
    * Package-private to support {@link OverseerTaskQueue} specifically.</p>
    */
   @Override
-  public Collection<Pair<String, byte[]>> peekElements(int max, long waitMillis, Predicate<String> acceptFilter) throws KeeperException {
+  public Collection<Pair<String, byte[]>> peekElements(int max, long waitMillis, Predicate<String> acceptFilter) throws KeeperException, InterruptedException {
     if (log.isDebugEnabled()) log.debug("peekElements {} {}", max, acceptFilter);
     List<Pair<String,byte[]>> result = null;
     ChildWatcher watcher = new ChildWatcher();
-    TreeSet<String> foundChildren = fetchZkChildren(watcher, acceptFilter);
+    TreeMap<String,byte[]> foundChildren = fetchZkChildren(watcher, acceptFilter);
     long waitNanos = TimeUnit.MILLISECONDS.toNanos(waitMillis);
     TimeOut timeout = new TimeOut(waitNanos, TimeUnit.NANOSECONDS, TimeSource.NANO_TIME);
     try {
@@ -487,98 +548,122 @@ public class ZkDistributedQueue implements DistributedQueue {
       // Technically we could restart the method if we fasil to actually obtain any valid children
       // from ZK, but this is a super rare case, and the latency of the ZK fetches would require
       // much more sophisticated waitNanos tracking.
-      result = new ArrayList<>(foundChildren.size());
-      for (String child : foundChildren) {
+      result = Collections.synchronizedList(new ArrayList<>(foundChildren.size()));
+      Set<String> dataPaths = new HashSet<>();
+      for (Map.Entry<String,byte[]> child : foundChildren.entrySet()) {
         if (result.size() >= max) {
           break;
         }
 
+        byte[] data;
+        updateLock.lockInterruptibly();
         try {
-          byte[] data = zookeeper.getData(dir + "/" + child, null, null, true);
-          if (log.isDebugEnabled()) log.debug("get data for child={}", child);
-          result.add(new Pair<>(child, data));
-        } catch (KeeperException.NoNodeException e) {
-          if (log.isDebugEnabled()) log.debug("no node found for child={}", child);
-          updateLock.lockInterruptibly();
-          try {
-            knownChildren.remove(child);
-          } finally {
-            if (updateLock.isHeldByCurrentThread()) {
-              updateLock.unlock();
-            }
+          data = knownChildren.get(child.getKey());
+        } finally {
+          if (updateLock.isHeldByCurrentThread()) {
+            updateLock.unlock();
           }
         }
+
+        if (data == null) {
+          // nocommit - lets not reget what we have in knownChildren, also, use asyncp
+          dataPaths.add(dir + "/" + child.getKey());
+          if (log.isDebugEnabled()) log.debug("get data for child={}", child.getKey());
+        } else {
+          result.add(new Pair<>(child.getKey(), data));
+        }
       }
-      return result;
+
+      Map<String,byte[]> dataMap = zookeeper.getData(dataPaths);
+      updateLock.lockInterruptibly();
+      try {
+        List<Pair<String,byte[]>> finalResult = result;
+        dataMap.forEach((k, bytes) -> {
+          finalResult.add(new Pair<>(k, bytes));
+          if (bytes != null) {
+            knownChildren.put(new File(k).getName(), bytes);
+          } else {
+            knownChildren.remove(new File(k).getName());
+          }
+        });
+      } finally {
+        if (updateLock.isHeldByCurrentThread()) {
+          updateLock.unlock();
+        }
+      }
+
+      return new ArrayList<>(result);
     } catch (InterruptedException e) {
       ParWork.propagateInterrupt(e);
       throw new AlreadyClosedException(e);
-    } finally {
-      try {
-        zookeeper.getSolrZooKeeper().removeAllWatches(dir, Watcher.WatcherType.Children, false);
-      } catch (Exception e) {
-        log.info(e.getMessage());
-      }
     }
   }
 
-  private void waitForChildren(Predicate<String> acceptFilter, TreeSet<String> foundChildren, long waitNanos, TimeOut timeout, ChildWatcher watcher) throws InterruptedException, KeeperException {
+  private void waitForChildren(Predicate<String> acceptFilter, TreeMap<String,byte[]> foundChildren, long waitNanos, TimeOut timeout, ChildWatcher watcher) throws InterruptedException, KeeperException {
     if (log.isDebugEnabled()) log.debug("wait for children ... {}", waitNanos);
-    updateLock.lockInterruptibly();
     try {
-      for (String child : knownChildren) {
-        if (acceptFilter == null || !acceptFilter.test(child)) {
-          foundChildren.add(child);
-        }
-      }
-    } finally {
-      if (updateLock.isHeldByCurrentThread()) {
-        updateLock.unlock();
-      }
-    }
-    if (!foundChildren.isEmpty()) {
-      return;
-    }
-    if (waitNanos <= 0) {
-      if (log.isDebugEnabled()) log.debug("0 wait time and no children found, return");
-      return;
-    }
-    TreeSet<String> fc = null;
-    while (fc == null || fc.isEmpty()) {
-      if (watcher.fired) {
-        watcher.fired = false;
-        fc = fetchZkChildren(watcher, acceptFilter);
-        if (!fc.isEmpty()) {
-          foundChildren.addAll(fc);
-          return;
-        }
-      }
       updateLock.lockInterruptibly();
       try {
-        try {
-          changed.await(10, TimeUnit.MILLISECONDS);
-        } catch (InterruptedException e) {
-          ParWork.propagateInterrupt(e);
-        }
-        if (zookeeper.isClosed() || !zookeeper.isConnected()) {
-          throw new AlreadyClosedException();
-        }
-        if (timeout.hasTimedOut()) {
-          return;
-        }
-        for (String child : knownChildren) {
-          if (acceptFilter == null || !acceptFilter.test(child)) {
-            foundChildren.add(child);
+        for (Map.Entry<String,byte[]> child : knownChildren.entrySet()) {
+          if (acceptFilter == null || !acceptFilter.test(child.getKey())) {
+            foundChildren.put(child.getKey(), child.getValue());
           }
         }
-        if (!foundChildren.isEmpty()) {
-          return;
-        }
       } finally {
         if (updateLock.isHeldByCurrentThread()) {
           updateLock.unlock();
         }
       }
+      if (!foundChildren.isEmpty()) {
+        return;
+      }
+      if (waitNanos <= 0) {
+        if (log.isDebugEnabled()) log.debug("0 wait time and no children found, return");
+        return;
+      }
+      TreeMap<String,byte[]> fc = null;
+      while (fc == null || fc.isEmpty()) {
+        if (watcher.fired) {
+          watcher.fired = false;
+          fc = fetchZkChildren(watcher, acceptFilter);
+          if (!fc.isEmpty()) {
+            foundChildren.putAll(fc);
+            return;
+          }
+        }
+        updateLock.lockInterruptibly();
+        try {
+          try {
+            changed.await(10, TimeUnit.MILLISECONDS);
+          } catch (InterruptedException e) {
+            ParWork.propagateInterrupt(e);
+          }
+          if (zookeeper.isClosed() || !zookeeper.isConnected()) {
+            throw new AlreadyClosedException();
+          }
+          if (timeout.hasTimedOut()) {
+            return;
+          }
+          for (Map.Entry<String,byte[]> child : knownChildren.entrySet()) {
+            if (acceptFilter == null || !acceptFilter.test(child.getKey())) {
+              foundChildren.put(child.getKey(), child.getValue());
+            }
+          }
+          if (!foundChildren.isEmpty()) {
+            return;
+          }
+        } finally {
+          if (updateLock.isHeldByCurrentThread()) {
+            updateLock.unlock();
+          }
+        }
+      }
+    } finally {
+      try {
+        zookeeper.getSolrZooKeeper().removeAllWatches(dir, Watcher.WatcherType.Children, false);
+      } catch (Exception e) {
+        log.info(e.getMessage());
+      }
     }
   }
 
@@ -590,16 +675,48 @@ public class ZkDistributedQueue implements DistributedQueue {
   private byte[] firstElement() throws KeeperException {
     try {
 
-        String firstChild = null;
-        firstChild = firstChild(false);
-        if (firstChild == null) {
-          return null;
+      Map.Entry<String,byte[]> firstChild = firstChild(false);
+      if (firstChild == null) {
+        return null;
+      }
+      byte[] data;
+      updateLock.lockInterruptibly();
+      try {
+        data = knownChildren.get(firstChild.getKey());
+        if (data != null) {
+          return data;
+        }
+      } finally {
+        if (updateLock.isHeldByCurrentThread()) {
+          updateLock.unlock();
+        }
+      }
+
+      try {
+        data = zookeeper.getData(dir + "/" + firstChild.getKey(), null, null, true);
+        if (data != null) {
+          updateLock.lockInterruptibly();
+          try {
+            knownChildren.put(firstChild.getKey(), data);
+          } finally {
+            if (updateLock.isHeldByCurrentThread()) {
+              updateLock.unlock();
+            }
+          }
         }
+
+        return data;
+      } catch (KeeperException.NoNodeException e) {
+        updateLock.lockInterruptibly();
         try {
-          return zookeeper.getData(dir + "/" + firstChild, null, null, true);
-        } catch (KeeperException.NoNodeException e) {
-          return null;
+          knownChildren.remove(firstChild.getKey());
+        } finally {
+          if (updateLock.isHeldByCurrentThread()) {
+            updateLock.unlock();
+          }
         }
+        return null;
+      }
 
     } catch (InterruptedException e) {
       ParWork.propagateInterrupt(e);
@@ -609,14 +726,37 @@ public class ZkDistributedQueue implements DistributedQueue {
 
   private byte[] removeFirst() throws KeeperException {
     try {
-      String firstChild = firstChild(true);
+      Map.Entry<String,byte[]> firstChild = firstChild(true);
       if (firstChild == null) {
         return null;
       }
+
+      byte[] data;
+      updateLock.lockInterruptibly();
       try {
-        String path = dir + "/" + firstChild;
+        data = knownChildren.get(firstChild.getKey());
+        if (data != null) {
+          return data;
+        }
+      } finally {
+        if (updateLock.isHeldByCurrentThread()) {
+          updateLock.unlock();
+        }
+      }
+
+      try {
+        String path = dir + "/" + firstChild.getKey();
         byte[] result = zookeeper.getData(path, null, null, true);
         zookeeper.delete(path, -1, true);
+        updateLock.lockInterruptibly();
+        try {
+          knownChildren.remove(firstChild.getKey());
+
+        } finally {
+          if (updateLock.isHeldByCurrentThread()) {
+            updateLock.unlock();
+          }
+        }
         return result;
       } catch (KeeperException.NoNodeException e) {
         return null;
@@ -646,10 +786,10 @@ public class ZkDistributedQueue implements DistributedQueue {
       if (event.getType() == Event.EventType.NodeChildrenChanged) {
         updateLock.lock();
         try {
-          knownChildren = fetchZkChildren(null, null);
+          fetchZkChildren(null, null);
           fired = true;
           changed.signalAll();
-        } catch (KeeperException e) {
+        } catch (KeeperException | InterruptedException e) {
           log.error("", e);
         } finally {
           updateLock.unlock();
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
index 405083e..88c9bbc 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
@@ -41,6 +41,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.RejectedExecutionException;
@@ -727,6 +728,43 @@ public class SolrZkClient implements Closeable {
     }
   }
 
+  public Map<String,byte[]> getData(Set<String> paths) {
+
+    Map<String,byte[]> dataMap = new ConcurrentHashMap<>(paths.size());
+    CountDownLatch latch = new CountDownLatch(paths.size());
+
+    for (String path : paths) {
+      ZooKeeper keeper = connManager.getKeeper();
+      assert keeper != null;
+      keeper.getData(path, false, (rc, path1, ctx, data, stat) -> {
+        if (rc != 0) {
+          final KeeperException.Code keCode = KeeperException.Code.get(rc);
+          if (keCode == KeeperException.Code.NONODE) {
+            if (log.isDebugEnabled()) log.debug("No node found for {}", path1);
+          }
+        }
+
+        dataMap.put(path1, data);
+        latch.countDown();
+      }, null);
+    }
+
+    boolean success;
+    try {
+      success = latch.await(15, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      ParWork.propagateInterrupt(e);
+      log.error("mkDirs(String=" + paths + ")", e);
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+    }
+
+    if (!success) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Timeout waiting for operations to complete");
+    }
+
+    return dataMap;
+  }
+
   // Calls setData for a list of existing paths in parallel
   private void updateExistingPaths(List<String> pathsToUpdate, Map<String,byte[]> dataMap) throws KeeperException {
     final KeeperException[] keeperExceptions = new KeeperException[1];