You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by le...@apache.org on 2019/05/03 13:58:49 UTC

[incubator-druid] branch master updated: Improve parallelism of zookeeper based segment change processing (#7088)

This is an automated email from the ASF dual-hosted git repository.

leventov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new afbcb9c  Improve parallelism of zookeeper based segment change processing (#7088)
afbcb9c is described below

commit afbcb9c07f21c12f241f1dc6575589ef14cda836
Author: Samarth Jain <sa...@apache.org>
AuthorDate: Fri May 3 06:58:42 2019 -0700

    Improve parallelism of zookeeper based segment change processing (#7088)
    
    * V1 - improve parallelism of zookeeper based segment change processing
    
    * Create zk nodes in batches. Address code review comments.
    Introduce various configs.
    
    * Add documentation for the newly added configs
    
    * Fix test failures
    
    * Fix more test failures
    
    * Remove prinstacktrace statements
    
    * Address code review comments
    
    * Use a single queue
    
    * Address code review comments
    
    Since we have a separate load peon for every historical, just having a single SegmentChangeProcessor
    task per historical is enough. This commit also gets rid of the associated config druid.coordinator.loadqueuepeon.curator.numCreateThreads
    
    * Resolve merge conflict
    
    * Fix compilation failure
    
    * Remove batching since we already have a dynamic config maxSegmentsInNodeLoadingQueue that provides that control
    
    * Fix NPE in test
    
    * Remove documentation for configs that are no longer needed
    
    * Address code review comments
    
    * Address more code review comments
    
    * Fix checkstyle issue
    
    * Address code review comments
    
    * Code review comments
    
    * Add back monitor node remove executor
    
    * Cleanup code to isolate null checks  and minor refactoring
    
    * Change param name since it conflicts with member variable name
---
 docs/content/configuration/index.md                |   4 +-
 .../druid/segment/loading/SegmentLoaderConfig.java |   3 +-
 .../druid/server/coordination/ZkCoordinator.java   | 119 +++---
 .../server/coordinator/CuratorLoadQueuePeon.java   | 420 ++++++++++-----------
 .../server/coordinator/DruidCoordinatorConfig.java |   6 +
 .../server/coordination/ZkCoordinatorTest.java     |   3 +-
 .../coordinator/CuratorDruidCoordinatorTest.java   |   3 +-
 .../server/coordinator/DruidCoordinatorTest.java   |   3 +-
 .../server/coordinator/HttpLoadQueuePeonTest.java  |   3 +-
 .../server/coordinator/LoadQueuePeonTest.java      | 203 +++++-----
 .../server/coordinator/LoadQueuePeonTester.java    |  24 +-
 .../coordinator/TestDruidCoordinatorConfig.java    |   7 +-
 .../helper/DruidCoordinatorSegmentKillerTest.java  |   3 +-
 .../java/org/apache/druid/cli/CliCoordinator.java  |  13 +-
 14 files changed, 429 insertions(+), 385 deletions(-)

diff --git a/docs/content/configuration/index.md b/docs/content/configuration/index.md
index 91edb82..29c11fa 100644
--- a/docs/content/configuration/index.md
+++ b/docs/content/configuration/index.md
@@ -1254,8 +1254,8 @@ These Historical configurations can be defined in the `historical/runtime.proper
 |`druid.segmentCache.dropSegmentDelayMillis`|How long a process delays before completely dropping segment.|30000 (30 seconds)|
 |`druid.segmentCache.infoDir`|Historical processes keep track of the segments they are serving so that when the process is restarted they can reload the same segments without waiting for the Coordinator to reassign. This path defines where this metadata is kept. Directory will be created if needed.|${first_location}/info_dir|
 |`druid.segmentCache.announceIntervalMillis`|How frequently to announce segments while segments are loading from cache. Set this value to zero to wait for all segments to be loaded before announcing.|5000 (5 seconds)|
-|`druid.segmentCache.numLoadingThreads`|How many segments to drop or load concurrently from from deep storage.|10|
-|`druid.segmentCache.numBootstrapThreads`|How many segments to load concurrently from local storage at startup.|Same as numLoadingThreads|
+|`druid.segmentCache.numLoadingThreads`|How many segments to drop or load concurrently from deep storage. Note that the work of loading segments involves downloading segments from deep storage, decompressing them and loading them to a memory mapped location. So the work is not all I/O Bound. Depending on CPU and network load, one could possibly increase this config to a higher value.|Number of cores|
+|`druid.coordinator.loadqueuepeon.curator.numCallbackThreads`|Number of threads for executing callback actions associated with loading or dropping of segments. One might want to increase this number when noticing clusters are lagging behind w.r.t. balancing segments across historical nodes.|2|
 
 In `druid.segmentCache.locations`, *freeSpacePercent* was added because *maxSize* setting is only a theoretical limit and assumes that much space will always be available for storing segments. In case of any druid bug leading to unaccounted segment files left alone on disk or some other process writing stuff to disk, This check can start failing segment loading early before filling up the disk completely and leaving the host usable otherwise.
 
diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java
index b09bd0c..80f0fbc 100644
--- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java
+++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java
@@ -22,6 +22,7 @@ package org.apache.druid.segment.loading;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.collect.Lists;
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.utils.JvmUtils;
 import org.hibernate.validator.constraints.NotEmpty;
 
 import java.io.File;
@@ -46,7 +47,7 @@ public class SegmentLoaderConfig
   private int announceIntervalMillis = 0; // do not background announce
 
   @JsonProperty("numLoadingThreads")
-  private int numLoadingThreads = 10;
+  private int numLoadingThreads = JvmUtils.getRuntimeInfo().getAvailableProcessors();
 
   @JsonProperty("numBootstrapThreads")
   private Integer numBootstrapThreads = null;
diff --git a/server/src/main/java/org/apache/druid/server/coordination/ZkCoordinator.java b/server/src/main/java/org/apache/druid/server/coordination/ZkCoordinator.java
index fcf86b3..ca56b10 100644
--- a/server/src/main/java/org/apache/druid/server/coordination/ZkCoordinator.java
+++ b/server/src/main/java/org/apache/druid/server/coordination/ZkCoordinator.java
@@ -32,9 +32,11 @@ import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
 import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
 import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.segment.loading.SegmentLoaderConfig;
 import org.apache.druid.server.initialization.ZkPathsConfig;
 
 import java.io.IOException;
+import java.util.concurrent.ExecutorService;
 
 /**
  * Use {@link org.apache.druid.server.coordinator.HttpLoadQueuePeon} for segment load/drops.
@@ -54,6 +56,7 @@ public class ZkCoordinator
 
   private volatile PathChildrenCache loadQueueCache;
   private volatile boolean started = false;
+  private final ExecutorService segmentLoadUnloadService;
 
   @Inject
   public ZkCoordinator(
@@ -61,7 +64,8 @@ public class ZkCoordinator
       ObjectMapper jsonMapper,
       ZkPathsConfig zkPaths,
       DruidServerMetadata me,
-      CuratorFramework curator
+      CuratorFramework curator,
+      SegmentLoaderConfig config
   )
   {
     this.dataSegmentChangeHandler = loadDropHandler;
@@ -69,6 +73,10 @@ public class ZkCoordinator
     this.zkPaths = zkPaths;
     this.me = me;
     this.curator = curator;
+    this.segmentLoadUnloadService = Execs.multiThreaded(
+        config.getNumLoadingThreads(),
+        "ZKCoordinator--%d"
+    );
   }
 
   @LifecycleStart
@@ -102,63 +110,12 @@ public class ZkCoordinator
             new PathChildrenCacheListener()
             {
               @Override
-              public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
+              public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
               {
                 final ChildData child = event.getData();
                 switch (event.getType()) {
                   case CHILD_ADDED:
-                    final String path = child.getPath();
-                    final DataSegmentChangeRequest request = jsonMapper.readValue(
-                        child.getData(), DataSegmentChangeRequest.class
-                    );
-
-                    log.info("New request[%s] with zNode[%s].", request.asString(), path);
-
-                    try {
-                      request.go(
-                          dataSegmentChangeHandler,
-                          new DataSegmentChangeCallback()
-                          {
-                            boolean hasRun = false;
-
-                            @Override
-                            public void execute()
-                            {
-                              try {
-                                if (!hasRun) {
-                                  curator.delete().guaranteed().forPath(path);
-                                  log.info("Completed request [%s]", request.asString());
-                                  hasRun = true;
-                                }
-                              }
-                              catch (Exception e) {
-                                try {
-                                  curator.delete().guaranteed().forPath(path);
-                                }
-                                catch (Exception e1) {
-                                  log.error(e1, "Failed to delete zNode[%s], but ignoring exception.", path);
-                                }
-                                log.error(e, "Exception while removing zNode[%s]", path);
-                                throw new RuntimeException(e);
-                              }
-                            }
-                          }
-                      );
-                    }
-                    catch (Exception e) {
-                      try {
-                        curator.delete().guaranteed().forPath(path);
-                      }
-                      catch (Exception e1) {
-                        log.error(e1, "Failed to delete zNode[%s], but ignoring exception.", path);
-                      }
-
-                      log.makeAlert(e, "Segment load/unload: uncaught exception.")
-                         .addData("node", path)
-                         .addData("nodeProperties", request)
-                         .emit();
-                    }
-
+                    childAdded(child);
                     break;
                   case CHILD_REMOVED:
                     log.info("zNode[%s] was removed", event.getData().getPath());
@@ -168,6 +125,7 @@ public class ZkCoordinator
                 }
               }
             }
+
         );
         loadQueueCache.start();
       }
@@ -180,6 +138,59 @@ public class ZkCoordinator
     }
   }
 
+  private void childAdded(ChildData child)
+  {
+    segmentLoadUnloadService.submit(() -> {
+      final String path = child.getPath();
+      DataSegmentChangeRequest request = new SegmentChangeRequestNoop();
+      try {
+        final DataSegmentChangeRequest finalRequest = jsonMapper.readValue(
+            child.getData(),
+            DataSegmentChangeRequest.class
+        );
+
+        finalRequest.go(
+            dataSegmentChangeHandler,
+            new DataSegmentChangeCallback()
+            {
+              @Override
+              public void execute()
+              {
+                try {
+                  curator.delete().guaranteed().forPath(path);
+                  log.info("Completed request [%s]", finalRequest.asString());
+                }
+                catch (Exception e) {
+                  try {
+                    curator.delete().guaranteed().forPath(path);
+                  }
+                  catch (Exception e1) {
+                    log.error(e1, "Failed to delete zNode[%s], but ignoring exception.", path);
+                  }
+                  log.error(e, "Exception while removing zNode[%s]", path);
+                  throw new RuntimeException(e);
+                }
+              }
+            }
+        );
+      }
+      catch (Exception e) {
+        // Something went wrong in either deserializing the request using jsonMapper or when invoking it
+        try {
+          curator.delete().guaranteed().forPath(path);
+        }
+        catch (Exception e1) {
+          log.error(e1, "Failed to delete zNode[%s], but ignoring exception.", path);
+        }
+
+        log.makeAlert(e, "Segment load/unload: uncaught exception.")
+           .addData("node", path)
+           .addData("nodeProperties", request)
+           .emit();
+      }
+    });
+  }
+
   @LifecycleStop
   public void stop()
   {
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CuratorLoadQueuePeon.java b/server/src/main/java/org/apache/druid/server/coordinator/CuratorLoadQueuePeon.java
index a6b10bc..a4d5d94 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/CuratorLoadQueuePeon.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/CuratorLoadQueuePeon.java
@@ -21,36 +21,46 @@ package org.apache.druid.server.coordinator;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.api.CuratorWatcher;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.server.coordination.DataSegmentChangeRequest;
 import org.apache.druid.server.coordination.SegmentChangeRequestDrop;
 import org.apache.druid.server.coordination.SegmentChangeRequestLoad;
 import org.apache.druid.server.coordination.SegmentChangeRequestNoop;
 import org.apache.druid.timeline.DataSegment;
-import org.apache.druid.timeline.SegmentId;
 import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.data.Stat;
 
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * Use {@link HttpLoadQueuePeon} instead.
+ * <p>
+ * Objects of this class can be accessed by multiple threads. State wise, this class
+ * is thread safe and callers of the public methods can expect thread safe behavior.
+ * Though, like a typical object being accessed by multiple threads,
+ * callers shouldn't expect strict consistency in results between two calls
+ * of the same or different methods.
  */
 @Deprecated
 public class CuratorLoadQueuePeon extends LoadQueuePeon
@@ -59,40 +69,48 @@ public class CuratorLoadQueuePeon extends LoadQueuePeon
   private static final int DROP = 0;
   private static final int LOAD = 1;
 
-  private static void executeCallbacks(List<LoadPeonCallback> callbacks)
-  {
-    for (LoadPeonCallback callback : callbacks) {
-      if (callback != null) {
-        callback.execute();
-      }
-    }
-  }
-
   private final CuratorFramework curator;
   private final String basePath;
   private final ObjectMapper jsonMapper;
   private final ScheduledExecutorService processingExecutor;
+
+  /**
+   * Threadpool with daemon threads that execute callback actions associated
+   * with loading or dropping segments.
+   */
   private final ExecutorService callBackExecutor;
   private final DruidCoordinatorConfig config;
 
   private final AtomicLong queuedSize = new AtomicLong(0);
   private final AtomicInteger failedAssignCount = new AtomicInteger(0);
 
+  /**
+   * Needs to be thread safe since it can be concurrently accessed via
+   * {@link #loadSegment(DataSegment, LoadPeonCallback)}, {@link #actionCompleted(SegmentHolder)},
+   * {@link #getSegmentsToLoad()} and {@link #stop()}
+   */
   private final ConcurrentSkipListMap<DataSegment, SegmentHolder> segmentsToLoad = new ConcurrentSkipListMap<>(
       DruidCoordinator.SEGMENT_COMPARATOR_RECENT_FIRST
   );
+
+  /**
+   * Needs to be thread safe since it can be concurrently accessed via
+   * {@link #dropSegment(DataSegment, LoadPeonCallback)}, {@link #actionCompleted(SegmentHolder)},
+   * {@link #getSegmentsToDrop()} and {@link #stop()}
+   */
   private final ConcurrentSkipListMap<DataSegment, SegmentHolder> segmentsToDrop = new ConcurrentSkipListMap<>(
       DruidCoordinator.SEGMENT_COMPARATOR_RECENT_FIRST
   );
+
+  /**
+   * Needs to be thread safe since it can be concurrently accessed via
+   * {@link #markSegmentToDrop(DataSegment)}}, {@link #unmarkSegmentToDrop(DataSegment)}}
+   * and {@link #getSegmentsToDrop()}
+   */
   private final ConcurrentSkipListSet<DataSegment> segmentsMarkedToDrop = new ConcurrentSkipListSet<>(
       DruidCoordinator.SEGMENT_COMPARATOR_RECENT_FIRST
   );
 
-  private final Object lock = new Object();
-
-  private volatile SegmentHolder currentlyProcessing = null;
-  private boolean stopped = false;
-
   CuratorLoadQueuePeon(
       CuratorFramework curator,
       String basePath,
@@ -150,61 +168,30 @@ public class CuratorLoadQueuePeon extends LoadQueuePeon
   }
 
   @Override
-  public void loadSegment(final DataSegment segment, final LoadPeonCallback callback)
+  public void loadSegment(final DataSegment segment, @Nullable final LoadPeonCallback callback)
   {
-    synchronized (lock) {
-      if ((currentlyProcessing != null) &&
-          currentlyProcessing.getSegmentId().equals(segment.getId())) {
-        if (callback != null) {
-          currentlyProcessing.addCallback(callback);
-        }
-        return;
-      }
-    }
-
-    synchronized (lock) {
-      final SegmentHolder existingHolder = segmentsToLoad.get(segment);
-      if (existingHolder != null) {
-        if ((callback != null)) {
-          existingHolder.addCallback(callback);
-        }
-        return;
-      }
+    SegmentHolder segmentHolder = new SegmentHolder(segment, LOAD, Collections.singletonList(callback));
+    final SegmentHolder existingHolder = segmentsToLoad.putIfAbsent(segment, segmentHolder);
+    if (existingHolder != null) {
+      existingHolder.addCallback(callback);
+      return;
     }
-
     log.debug("Asking server peon[%s] to load segment[%s]", basePath, segment.getId());
     queuedSize.addAndGet(segment.getSize());
-    segmentsToLoad.put(segment, new SegmentHolder(segment, LOAD, Collections.singletonList(callback)));
+    processingExecutor.submit(new SegmentChangeProcessor(segmentHolder));
   }
 
   @Override
-  public void dropSegment(
-      final DataSegment segment,
-      final LoadPeonCallback callback
-  )
+  public void dropSegment(final DataSegment segment, @Nullable final LoadPeonCallback callback)
   {
-    synchronized (lock) {
-      if ((currentlyProcessing != null) &&
-          currentlyProcessing.getSegmentId().equals(segment.getId())) {
-        if (callback != null) {
-          currentlyProcessing.addCallback(callback);
-        }
-        return;
-      }
-    }
-
-    synchronized (lock) {
-      final SegmentHolder existingHolder = segmentsToDrop.get(segment);
-      if (existingHolder != null) {
-        if (callback != null) {
-          existingHolder.addCallback(callback);
-        }
-        return;
-      }
+    SegmentHolder segmentHolder = new SegmentHolder(segment, DROP, Collections.singletonList(callback));
+    final SegmentHolder existingHolder = segmentsToDrop.putIfAbsent(segment, segmentHolder);
+    if (existingHolder != null) {
+      existingHolder.addCallback(callback);
+      return;
     }
-
     log.debug("Asking server peon[%s] to drop segment[%s]", basePath, segment.getId());
-    segmentsToDrop.put(segment, new SegmentHolder(segment, DROP, Collections.singletonList(callback)));
+    processingExecutor.submit(new SegmentChangeProcessor(segmentHolder));
   }
 
   @Override
@@ -219,206 +206,198 @@ public class CuratorLoadQueuePeon extends LoadQueuePeon
     segmentsMarkedToDrop.remove(dataSegment);
   }
 
-  private void processSegmentChangeRequest()
+  private class SegmentChangeProcessor implements Runnable
   {
-    if (currentlyProcessing != null) {
-      log.debug(
-          "Server[%s] skipping processSegmentChangeRequest because something is currently loading[%s].",
-          basePath,
-          currentlyProcessing.getSegmentId()
-      );
+    private final SegmentHolder segmentHolder;
 
-      return;
+    private SegmentChangeProcessor(SegmentHolder segmentHolder)
+    {
+      this.segmentHolder = segmentHolder;
     }
 
-    if (!segmentsToDrop.isEmpty()) {
-      currentlyProcessing = segmentsToDrop.firstEntry().getValue();
-      log.debug("Server[%s] dropping [%s]", basePath, currentlyProcessing.getSegmentId());
-    } else if (!segmentsToLoad.isEmpty()) {
-      currentlyProcessing = segmentsToLoad.firstEntry().getValue();
-      log.debug("Server[%s] loading [%s]", basePath, currentlyProcessing.getSegmentId());
-    } else {
-      return;
+    @Override
+    public void run()
+    {
+      try {
+        final String path = ZKPaths.makePath(basePath, segmentHolder.getSegmentIdentifier());
+        final byte[] payload = jsonMapper.writeValueAsBytes(segmentHolder.getChangeRequest());
+        curator.create().withMode(CreateMode.EPHEMERAL).forPath(path, payload);
+        log.debug(
+            "ZKNode created for server to [%s] %s [%s]",
+            basePath,
+            segmentHolder.getType() == LOAD ? "load" : "drop",
+            segmentHolder.getSegmentIdentifier()
+        );
+        final ScheduledFuture<?> nodeDeletedCheck = scheduleNodeDeletedCheck(path);
+        final Stat stat = curator.checkExists().usingWatcher(
+            (CuratorWatcher) watchedEvent -> {
+              switch (watchedEvent.getType()) {
+                case NodeDeleted:
+                  // Cancel the check node deleted task since we have already
+                  // been notified by the zk watcher
+                  nodeDeletedCheck.cancel(true);
+                  entryRemoved(segmentHolder, watchedEvent.getPath());
+                  break;
+                default:
+                  // do nothing
+              }
+            }
+        ).forPath(path);
+
+        if (stat == null) {
+          final byte[] noopPayload = jsonMapper.writeValueAsBytes(new SegmentChangeRequestNoop());
+
+          // Create a node and then delete it to remove the registered watcher.  This is a work-around for
+          // a zookeeper race condition.  Specifically, when you set a watcher, it fires on the next event
+          // that happens for that node.  If no events happen, the watcher stays registered foreverz.
+          // Couple that with the fact that you cannot set a watcher when you create a node, but what we
+          // want is to create a node and then watch for it to get deleted.  The solution is that you *can*
+          // set a watcher when you check to see if it exists so, we first create the node and then set a
+          // watcher on its existence.  However, if already does not exist by the time the existence check
+          // returns, then the watcher that was set will never fire (nobody will ever create the node
+          // again) and thus lead to a slow, but real, memory leak.  So, we create another node to cause
+          // that watcher to fire and delete it right away.
+          //
+          // We do not create the existence watcher first, because then it will fire when we create the
+          // node and we'll have the same race when trying to refresh that watcher.
+          curator.create().withMode(CreateMode.EPHEMERAL).forPath(path, noopPayload);
+          entryRemoved(segmentHolder, path);
+        }
+      }
+      catch (KeeperException.NodeExistsException ne) {
+        // This is expected when historicals haven't yet picked up processing this segment and coordinator
+        // tries reassigning it to the same node.
+        log.warn(ne, "ZK node already exists because segment change request hasn't yet been processed");
+        failAssign(segmentHolder);
+      }
+      catch (Exception e) {
+        failAssign(segmentHolder, e);
+      }
     }
 
-    try {
-      final String path = ZKPaths.makePath(basePath, currentlyProcessing.getSegmentId().toString());
-      final byte[] payload = jsonMapper.writeValueAsBytes(currentlyProcessing.getChangeRequest());
-      curator.create().withMode(CreateMode.EPHEMERAL).forPath(path, payload);
-
-      processingExecutor.schedule(
+    @Nonnull
+    private ScheduledFuture<?> scheduleNodeDeletedCheck(String path)
+    {
+      return processingExecutor.schedule(
           () -> {
             try {
               if (curator.checkExists().forPath(path) != null) {
-                failAssign(new ISE("%s was never removed! Failing this operation!", path));
+                failAssign(segmentHolder, new ISE("%s was never removed! Failing this operation!", path));
+              } else {
+                log.debug("%s detected to be removed. ", path);
               }
             }
             catch (Exception e) {
-              failAssign(e);
+              log.error(e, "Exception caught and ignored when checking whether zk node was deleted");
+              failAssign(segmentHolder, e);
             }
           },
           config.getLoadTimeoutDelay().getMillis(),
           TimeUnit.MILLISECONDS
       );
-
-      final Stat stat = curator.checkExists().usingWatcher(
-          (CuratorWatcher) watchedEvent -> {
-            switch (watchedEvent.getType()) {
-              case NodeDeleted:
-                entryRemoved(watchedEvent.getPath());
-                break;
-              default:
-                // do nothing
-            }
-          }
-      ).forPath(path);
-
-      if (stat == null) {
-        final byte[] noopPayload = jsonMapper.writeValueAsBytes(new SegmentChangeRequestNoop());
-
-        // Create a node and then delete it to remove the registered watcher.  This is a work-around for
-        // a zookeeper race condition.  Specifically, when you set a watcher, it fires on the next event
-        // that happens for that node.  If no events happen, the watcher stays registered foreverz.
-        // Couple that with the fact that you cannot set a watcher when you create a node, but what we
-        // want is to create a node and then watch for it to get deleted.  The solution is that you *can*
-        // set a watcher when you check to see if it exists so, we first create the node and then set a
-        // watcher on its existence.  However, if already does not exist by the time the existence check
-        // returns, then the watcher that was set will never fire (nobody will ever create the node
-        // again) and thus lead to a slow, but real, memory leak.  So, we create another node to cause
-        // that watcher to fire and delete it right away.
-        //
-        // We do not create the existence watcher first, because then it will fire when we create the
-        // node and we'll have the same race when trying to refresh that watcher.
-        curator.create().withMode(CreateMode.EPHEMERAL).forPath(path, noopPayload);
-
-        entryRemoved(path);
-      }
-    }
-    catch (Exception e) {
-      failAssign(e);
     }
   }
 
-  private void actionCompleted()
+  private void actionCompleted(SegmentHolder segmentHolder)
   {
-    if (currentlyProcessing != null) {
-      switch (currentlyProcessing.getType()) {
-        case LOAD:
-          segmentsToLoad.remove(currentlyProcessing.getSegment());
-          queuedSize.addAndGet(-currentlyProcessing.getSegmentSize());
-          break;
-        case DROP:
-          segmentsToDrop.remove(currentlyProcessing.getSegment());
-          break;
-        default:
-          throw new UnsupportedOperationException();
-      }
-
-      final List<LoadPeonCallback> callbacks = currentlyProcessing.getCallbacks();
-      currentlyProcessing = null;
-      callBackExecutor.execute(
-          () -> executeCallbacks(callbacks)
-      );
+    switch (segmentHolder.getType()) {
+      case LOAD:
+        segmentsToLoad.remove(segmentHolder.getSegment());
+        queuedSize.addAndGet(-segmentHolder.getSegmentSize());
+        break;
+      case DROP:
+        segmentsToDrop.remove(segmentHolder.getSegment());
+        break;
+      default:
+        throw new UnsupportedOperationException();
     }
+    executeCallbacks(segmentHolder);
   }
 
+
   @Override
   public void start()
-  {
-    ScheduledExecutors.scheduleAtFixedRate(
-        processingExecutor,
-        config.getLoadQueuePeonRepeatDelay(),
-        config.getLoadQueuePeonRepeatDelay(),
-        () -> {
-          processSegmentChangeRequest();
-
-          if (stopped) {
-            return ScheduledExecutors.Signal.STOP;
-          } else {
-            return ScheduledExecutors.Signal.REPEAT;
-          }
-        }
-    );
-  }
+  { }
 
   @Override
   public void stop()
   {
-    synchronized (lock) {
-      if (currentlyProcessing != null) {
-        executeCallbacks(currentlyProcessing.getCallbacks());
-        currentlyProcessing = null;
-      }
-
-      if (!segmentsToDrop.isEmpty()) {
-        for (SegmentHolder holder : segmentsToDrop.values()) {
-          executeCallbacks(holder.getCallbacks());
-        }
-      }
-      segmentsToDrop.clear();
-
-      if (!segmentsToLoad.isEmpty()) {
-        for (SegmentHolder holder : segmentsToLoad.values()) {
-          executeCallbacks(holder.getCallbacks());
-        }
-      }
-      segmentsToLoad.clear();
+    for (SegmentHolder holder : segmentsToDrop.values()) {
+      executeCallbacks(holder);
+    }
+    segmentsToDrop.clear();
 
-      queuedSize.set(0L);
-      failedAssignCount.set(0);
-      stopped = true;
+    for (SegmentHolder holder : segmentsToLoad.values()) {
+      executeCallbacks(holder);
     }
+    segmentsToLoad.clear();
+
+    queuedSize.set(0L);
+    failedAssignCount.set(0);
+    processingExecutor.shutdown();
+    callBackExecutor.shutdown();
   }
 
-  private void entryRemoved(String path)
+  private void entryRemoved(SegmentHolder segmentHolder, String path)
   {
-    synchronized (lock) {
-      if (currentlyProcessing == null) {
-        log.warn("Server[%s] an entry[%s] was removed even though it wasn't loading!?", basePath, path);
-        return;
-      }
-      if (!ZKPaths.getNodeFromPath(path).equals(currentlyProcessing.getSegmentId().toString())) {
-        log.warn(
-            "Server[%s] entry [%s] was removed even though it's not what is currently loading[%s]",
-            basePath, path, currentlyProcessing
-        );
-        return;
-      }
-      log.debug(
-          "Server[%s] done processing %s of segment [%s]",
-          basePath,
-          currentlyProcessing.getType() == LOAD ? "load" : "drop",
-          path
+    if (!ZKPaths.getNodeFromPath(path).equals(segmentHolder.getSegmentIdentifier())) {
+      log.warn(
+          "Server[%s] entry [%s] was removed even though it's not what is currently loading[%s]",
+          basePath, path, segmentHolder
       );
-      actionCompleted();
+      return;
     }
+    actionCompleted(segmentHolder);
+    log.debug(
+        "Server[%s] done processing %s of segment [%s]",
+        basePath,
+        segmentHolder.getType() == LOAD ? "load" : "drop",
+        path
+    );
   }
 
-  private void failAssign(Exception e)
+  private void failAssign(SegmentHolder segmentHolder)
   {
-    synchronized (lock) {
-      log.error(e, "Server[%s], throwable caught when submitting [%s].", basePath, currentlyProcessing);
-      failedAssignCount.getAndIncrement();
-      // Act like it was completed so that the coordinator gives it to someone else
-      actionCompleted();
+    failAssign(segmentHolder, null);
+  }
+
+  private void failAssign(SegmentHolder segmentHolder, Exception e)
+  {
+    if (e != null) {
+      log.error(e, "Server[%s], throwable caught when submitting [%s].", basePath, segmentHolder);
     }
+    failedAssignCount.getAndIncrement();
+    // Act like it was completed so that the coordinator gives it to someone else
+    actionCompleted(segmentHolder);
   }
 
+
   private static class SegmentHolder
   {
     private final DataSegment segment;
     private final DataSegmentChangeRequest changeRequest;
     private final int type;
+    // Guaranteed to store only non-null elements
     private final List<LoadPeonCallback> callbacks = new ArrayList<>();
 
-    private SegmentHolder(DataSegment segment, int type, Collection<LoadPeonCallback> callbacks)
+    private SegmentHolder(
+        DataSegment segment,
+        int type,
+        Collection<LoadPeonCallback> callbacksParam
+    )
     {
       this.segment = segment;
       this.type = type;
       this.changeRequest = (type == LOAD)
                            ? new SegmentChangeRequestLoad(segment)
                            : new SegmentChangeRequestDrop(segment);
-      this.callbacks.addAll(callbacks);
+      Iterator<LoadPeonCallback> itr = callbacksParam.iterator();
+      while (itr.hasNext()) {
+        LoadPeonCallback c = itr.next();
+        if (c != null) {
+          callbacks.add(c);
+        }
+      }
     }
 
     public DataSegment getSegment()
@@ -431,9 +410,9 @@ public class CuratorLoadQueuePeon extends LoadQueuePeon
       return type;
     }
 
-    public SegmentId getSegmentId()
+    public String getSegmentIdentifier()
     {
-      return segment.getId();
+      return segment.getId().toString();
     }
 
     public long getSegmentSize()
@@ -441,24 +420,20 @@ public class CuratorLoadQueuePeon extends LoadQueuePeon
       return segment.getSize();
     }
 
-    public void addCallbacks(Collection<LoadPeonCallback> newCallbacks)
+    public void addCallback(@Nullable LoadPeonCallback newCallback)
     {
-      synchronized (callbacks) {
-        callbacks.addAll(newCallbacks);
-      }
-    }
-
-    public void addCallback(LoadPeonCallback newCallback)
-    {
-      synchronized (callbacks) {
-        callbacks.add(newCallback);
+      if (newCallback != null) {
+        synchronized (callbacks) {
+          callbacks.add(newCallback);
+        }
       }
     }
 
-    public List<LoadPeonCallback> getCallbacks()
+    List<LoadPeonCallback> snapshotCallbacks()
     {
       synchronized (callbacks) {
-        return callbacks;
+        // Return an immutable copy so that callers don't have to worry about concurrent modification
+        return ImmutableList.copyOf(callbacks);
       }
     }
 
@@ -473,4 +448,11 @@ public class CuratorLoadQueuePeon extends LoadQueuePeon
       return changeRequest.toString();
     }
   }
+
+  private void executeCallbacks(SegmentHolder holder)
+  {
+    for (LoadPeonCallback callback : holder.snapshotCallbacks()) {
+      callBackExecutor.submit(() -> callback.execute());
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java
index 32eb87c..b4adebe 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java
@@ -75,6 +75,12 @@ public abstract class DruidCoordinatorConfig
     return "curator";
   }
 
+  @Config("druid.coordinator.curator.loadqueuepeon.numCallbackThreads")
+  public int getNumCuratorCallBackThreads()
+  {
+    return 2;
+  }
+
   @Config("druid.coordinator.loadqueuepeon.http.repeatDelay")
   public Duration getHttpLoadQueuePeonRepeatDelay()
   {
diff --git a/server/src/test/java/org/apache/druid/server/coordination/ZkCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordination/ZkCoordinatorTest.java
index 1be65cb..9bdd84e 100644
--- a/server/src/test/java/org/apache/druid/server/coordination/ZkCoordinatorTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordination/ZkCoordinatorTest.java
@@ -133,7 +133,8 @@ public class ZkCoordinatorTest extends CuratorTestBase
         jsonMapper,
         zkPaths,
         me,
-        curator
+        curator,
+        new SegmentLoaderConfig()
     );
     zkCoordinator.start();
 
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java
index 25dce8c..3b22235 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java
@@ -162,7 +162,8 @@ public class CuratorDruidCoordinatorTest extends CuratorTestBase
         null,
         false,
         false,
-        new Duration("PT0s")
+        new Duration("PT0s"),
+        Duration.millis(10)
     );
     sourceLoadQueueChildrenCache = new PathChildrenCache(
         curator,
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
index 1f4f7e2..793bd28 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
@@ -139,7 +139,8 @@ public class DruidCoordinatorTest extends CuratorTestBase
         null,
         false,
         false,
-        new Duration("PT0s")
+        new Duration("PT0s"),
+        Duration.millis(10)
     );
     pathChildrenCache = new PathChildrenCache(
         curator,
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java b/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java
index 0f73695..894472a 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java
@@ -84,7 +84,8 @@ public class HttpLoadQueuePeonTest
       null,
       false,
       false,
-      Duration.ZERO
+      Duration.ZERO,
+      Duration.millis(10)
   )
   {
     @Override
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTest.java b/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTest.java
index 437b2e4..8d8271d 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTest.java
@@ -39,6 +39,7 @@ import org.apache.druid.server.coordination.DataSegmentChangeRequest;
 import org.apache.druid.server.coordination.SegmentChangeRequestDrop;
 import org.apache.druid.server.coordination.SegmentChangeRequestLoad;
 import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
 import org.apache.druid.timeline.partition.NoneShardSpec;
 import org.joda.time.Duration;
 import org.junit.After;
@@ -47,8 +48,10 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.TimeUnit;
 
 public class LoadQueuePeonTest extends CuratorTestBase
 {
@@ -79,46 +82,34 @@ public class LoadQueuePeonTest extends CuratorTestBase
   @Test
   public void testMultipleLoadDropSegments() throws Exception
   {
-    final AtomicInteger requestSignalIdx = new AtomicInteger(0);
-    final AtomicInteger segmentSignalIdx = new AtomicInteger(0);
-
     loadQueuePeon = new CuratorLoadQueuePeon(
         curator,
         LOAD_QUEUE_PATH,
         jsonMapper,
         Execs.scheduledSingleThreaded("test_load_queue_peon_scheduled-%d"),
         Execs.singleThreaded("test_load_queue_peon-%d"),
-        new TestDruidCoordinatorConfig(null, null, null, null, null, null, 10, null, false, false, Duration.ZERO)
+        new TestDruidCoordinatorConfig(
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            10,
+            null,
+            false,
+            false,
+            Duration.millis(0),
+            Duration.millis(10)
+        )
     );
 
     loadQueuePeon.start();
 
-    final CountDownLatch[] loadRequestSignal = new CountDownLatch[5];
-    final CountDownLatch[] dropRequestSignal = new CountDownLatch[5];
-    final CountDownLatch[] segmentLoadedSignal = new CountDownLatch[5];
-    final CountDownLatch[] segmentDroppedSignal = new CountDownLatch[5];
-
-    for (int i = 0; i < 5; ++i) {
-      loadRequestSignal[i] = new CountDownLatch(1);
-      dropRequestSignal[i] = new CountDownLatch(1);
-      segmentLoadedSignal[i] = new CountDownLatch(1);
-      segmentDroppedSignal[i] = new CountDownLatch(1);
-    }
-
-    final DataSegmentChangeHandler handler = new DataSegmentChangeHandler()
-    {
-      @Override
-      public void addSegment(DataSegment segment, DataSegmentChangeCallback callback)
-      {
-        loadRequestSignal[requestSignalIdx.get()].countDown();
-      }
-
-      @Override
-      public void removeSegment(DataSegment segment, DataSegmentChangeCallback callback)
-      {
-        dropRequestSignal[requestSignalIdx.get()].countDown();
-      }
-    };
+    ConcurrentMap<SegmentId, CountDownLatch> loadRequestSignals = new ConcurrentHashMap<>(5);
+    ConcurrentMap<SegmentId, CountDownLatch> dropRequestSignals = new ConcurrentHashMap<>(5);
+    ConcurrentMap<SegmentId, CountDownLatch> segmentLoadedSignals = new ConcurrentHashMap<>(5);
+    ConcurrentMap<SegmentId, CountDownLatch> segmentDroppedSignals = new ConcurrentHashMap<>(5);
 
     final List<DataSegment> segmentToDrop = Lists.transform(
         ImmutableList.of(
@@ -132,11 +123,24 @@ public class LoadQueuePeonTest extends CuratorTestBase
           @Override
           public DataSegment apply(String intervalStr)
           {
-            return dataSegmentWithInterval(intervalStr);
+            DataSegment dataSegment = dataSegmentWithInterval(intervalStr);
+            return dataSegment;
           }
         }
     );
 
+    final CountDownLatch[] dropRequestLatches = new CountDownLatch[5];
+    final CountDownLatch[] dropSegmentLatches = new CountDownLatch[5];
+    for (int i = 0; i < 5; i++) {
+      dropRequestLatches[i] = new CountDownLatch(1);
+      dropSegmentLatches[i] = new CountDownLatch(1);
+    }
+    int i = 0;
+    for (DataSegment s : segmentToDrop) {
+      dropRequestSignals.put(s.getId(), dropRequestLatches[i]);
+      segmentDroppedSignals.put(s.getId(), dropSegmentLatches[i++]);
+    }
+
     final List<DataSegment> segmentToLoad = Lists.transform(
         ImmutableList.of(
             "2014-10-27T00:00:00Z/P1D",
@@ -149,11 +153,26 @@ public class LoadQueuePeonTest extends CuratorTestBase
           @Override
           public DataSegment apply(String intervalStr)
           {
-            return dataSegmentWithInterval(intervalStr);
+            DataSegment dataSegment = dataSegmentWithInterval(intervalStr);
+            loadRequestSignals.put(dataSegment.getId(), new CountDownLatch(1));
+            segmentLoadedSignals.put(dataSegment.getId(), new CountDownLatch(1));
+            return dataSegment;
           }
         }
     );
 
+    final CountDownLatch[] loadRequestLatches = new CountDownLatch[5];
+    final CountDownLatch[] segmentLoadedLatches = new CountDownLatch[5];
+    for (i = 0; i < 5; i++) {
+      loadRequestLatches[i] = new CountDownLatch(1);
+      segmentLoadedLatches[i] = new CountDownLatch(1);
+    }
+    i = 0;
+    for (DataSegment s : segmentToDrop) {
+      loadRequestSignals.put(s.getId(), loadRequestLatches[i]);
+      segmentLoadedSignals.put(s.getId(), segmentLoadedLatches[i++]);
+    }
+
     // segment with latest interval should be loaded first
     final List<DataSegment> expectedLoadOrder = Lists.transform(
         ImmutableList.of(
@@ -162,59 +181,48 @@ public class LoadQueuePeonTest extends CuratorTestBase
             "2014-10-30T00:00:00Z/P1D",
             "2014-10-28T00:00:00Z/P1D",
             "2014-10-27T00:00:00Z/P1D"
-        ), new Function<String, DataSegment>()
-        {
-          @Override
-          public DataSegment apply(String intervalStr)
-          {
-            return dataSegmentWithInterval(intervalStr);
-          }
-        }
+        ), intervalStr -> dataSegmentWithInterval(intervalStr)
     );
 
+    final DataSegmentChangeHandler handler = new DataSegmentChangeHandler()
+    {
+      @Override
+      public void addSegment(DataSegment segment, DataSegmentChangeCallback callback)
+      {
+        loadRequestSignals.get(segment.getId()).countDown();
+      }
+
+      @Override
+      public void removeSegment(DataSegment segment, DataSegmentChangeCallback callback)
+      {
+        dropRequestSignals.get(segment.getId()).countDown();
+      }
+    };
+
     loadQueueCache.getListenable().addListener(
-        new PathChildrenCacheListener()
-        {
-          @Override
-          public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
-          {
-            if (event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED) {
-              DataSegmentChangeRequest request = jsonMapper.readValue(
-                  event.getData().getData(),
-                  DataSegmentChangeRequest.class
-              );
-              request.go(handler, null);
-            }
+        (client, event) -> {
+          if (event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED) {
+            DataSegmentChangeRequest request = jsonMapper.readValue(
+                event.getData().getData(),
+                DataSegmentChangeRequest.class
+            );
+            request.go(handler, null);
           }
         }
     );
     loadQueueCache.start();
 
-    for (DataSegment segment : segmentToDrop) {
+    for (final DataSegment segment : segmentToDrop) {
       loadQueuePeon.dropSegment(
           segment,
-          new LoadPeonCallback()
-          {
-            @Override
-            public void execute()
-            {
-              segmentDroppedSignal[segmentSignalIdx.get()].countDown();
-            }
-          }
+          () -> segmentDroppedSignals.get(segment.getId()).countDown()
       );
     }
 
-    for (DataSegment segment : segmentToLoad) {
+    for (final DataSegment segment : segmentToLoad) {
       loadQueuePeon.loadSegment(
           segment,
-          new LoadPeonCallback()
-          {
-            @Override
-            public void execute()
-            {
-              segmentLoadedSignal[segmentSignalIdx.get()].countDown();
-            }
-          }
+          () -> segmentLoadedSignals.get(segment.getId()).countDown()
       );
     }
 
@@ -224,8 +232,14 @@ public class LoadQueuePeonTest extends CuratorTestBase
 
     for (DataSegment segment : segmentToDrop) {
       String dropRequestPath = ZKPaths.makePath(LOAD_QUEUE_PATH, segment.getId().toString());
-      Assert.assertTrue(timing.forWaiting().awaitLatch(dropRequestSignal[requestSignalIdx.get()]));
-      Assert.assertNotNull(curator.checkExists().forPath(dropRequestPath));
+      Assert.assertTrue(
+          "Latch not counted down for " + dropRequestSignals.get(segment.getId()),
+          dropRequestSignals.get(segment.getId()).await(10, TimeUnit.SECONDS)
+      );
+      Assert.assertNotNull(
+          "Path " + dropRequestPath + " doesn't exist",
+          curator.checkExists().forPath(dropRequestPath)
+      );
       Assert.assertEquals(
           segment,
           ((SegmentChangeRequestDrop) jsonMapper.readValue(
@@ -235,29 +249,14 @@ public class LoadQueuePeonTest extends CuratorTestBase
           )).getSegment()
       );
 
-      if (requestSignalIdx.get() == 4) {
-        requestSignalIdx.set(0);
-      } else {
-        requestSignalIdx.incrementAndGet();
-      }
-
       // simulate completion of drop request by historical
       curator.delete().guaranteed().forPath(dropRequestPath);
-      Assert.assertTrue(timing.forWaiting().awaitLatch(segmentDroppedSignal[segmentSignalIdx.get()]));
-
-      int expectedNumSegmentToDrop = 5 - segmentSignalIdx.get() - 1;
-      Assert.assertEquals(expectedNumSegmentToDrop, loadQueuePeon.getSegmentsToDrop().size());
-
-      if (segmentSignalIdx.get() == 4) {
-        segmentSignalIdx.set(0);
-      } else {
-        segmentSignalIdx.incrementAndGet();
-      }
+      Assert.assertTrue(timing.forWaiting().awaitLatch(segmentDroppedSignals.get(segment.getId())));
     }
 
     for (DataSegment segment : expectedLoadOrder) {
       String loadRequestPath = ZKPaths.makePath(LOAD_QUEUE_PATH, segment.getId().toString());
-      Assert.assertTrue(timing.forWaiting().awaitLatch(loadRequestSignal[requestSignalIdx.get()]));
+      Assert.assertTrue(timing.forWaiting().awaitLatch(loadRequestSignals.get(segment.getId())));
       Assert.assertNotNull(curator.checkExists().forPath(loadRequestPath));
       Assert.assertEquals(
           segment,
@@ -266,16 +265,9 @@ public class LoadQueuePeonTest extends CuratorTestBase
               .getSegment()
       );
 
-      requestSignalIdx.incrementAndGet();
-
       // simulate completion of load request by historical
       curator.delete().guaranteed().forPath(loadRequestPath);
-      Assert.assertTrue(timing.forWaiting().awaitLatch(segmentLoadedSignal[segmentSignalIdx.get()]));
-
-      int expectedNumSegmentToLoad = 5 - segmentSignalIdx.get() - 1;
-      Assert.assertEquals(1200 * expectedNumSegmentToLoad, loadQueuePeon.getLoadQueueSize());
-      Assert.assertEquals(expectedNumSegmentToLoad, loadQueuePeon.getSegmentsToLoad().size());
-      segmentSignalIdx.incrementAndGet();
+      Assert.assertTrue(timing.forWaiting().awaitLatch(segmentLoadedSignals.get(segment.getId())));
     }
   }
 
@@ -294,7 +286,20 @@ public class LoadQueuePeonTest extends CuratorTestBase
         Execs.scheduledSingleThreaded("test_load_queue_peon_scheduled-%d"),
         Execs.singleThreaded("test_load_queue_peon-%d"),
         // set time-out to 1 ms so that LoadQueuePeon will fail the assignment quickly
-        new TestDruidCoordinatorConfig(null, null, null, new Duration(1), null, null, 10, null, false, false, new Duration("PT1s"))
+        new TestDruidCoordinatorConfig(
+            null,
+            null,
+            null,
+            new Duration(1),
+            null,
+            null,
+            10,
+            null,
+            false,
+            false,
+            new Duration("PT1s"),
+            Duration.millis(10)
+        )
     );
 
     loadQueuePeon.start();
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTester.java b/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTester.java
index e07f363..c979671 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTester.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTester.java
@@ -19,7 +19,9 @@
 
 package org.apache.druid.server.coordinator;
 
+import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.timeline.DataSegment;
+import org.joda.time.Duration;
 
 import java.util.concurrent.ConcurrentSkipListSet;
 
@@ -29,7 +31,27 @@ public class LoadQueuePeonTester extends CuratorLoadQueuePeon
 
   public LoadQueuePeonTester()
   {
-    super(null, null, null, null, null, null);
+    super(
+        null,
+        null,
+        null,
+        Execs.scheduledSingleThreaded("LoadQueuePeonTester--%d"),
+        null,
+        new TestDruidCoordinatorConfig(
+            null,
+            null,
+            null,
+            new Duration(1),
+            null,
+            null,
+            10,
+            null,
+            false,
+            false,
+            new Duration("PT1s"),
+            Duration.millis(10)
+        )
+    );
   }
 
   @Override
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java b/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java
index 317f2fe..e1b9135 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/TestDruidCoordinatorConfig.java
@@ -46,7 +46,8 @@ public class TestDruidCoordinatorConfig extends DruidCoordinatorConfig
       String consoleStatic,
       boolean mergeSegments,
       boolean convertSegments,
-      Duration getLoadQueuePeonRepeatDelay
+      Duration getLoadQueuePeonRepeatDelay,
+      Duration CuratorCreateZkNodesRepeatDelay
   )
   {
     this.coordinatorStartDelay = coordinatorStartDelay;
@@ -108,8 +109,10 @@ public class TestDruidCoordinatorConfig extends DruidCoordinatorConfig
     return consoleStatic;
   }
 
-  @Override public Duration getLoadQueuePeonRepeatDelay()
+  @Override
+  public Duration getLoadQueuePeonRepeatDelay()
   {
     return getLoadQueuePeonRepeatDelay;
   }
+
 }
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentKillerTest.java b/server/src/test/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentKillerTest.java
index 8593e30..7ee58a2 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentKillerTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentKillerTest.java
@@ -113,7 +113,8 @@ public class DruidCoordinatorSegmentKillerTest
             null,
             false,
             false,
-            Duration.ZERO
+            Duration.ZERO,
+            Duration.millis(10)
         )
     );
 
diff --git a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
index 4097e95..8824562 100644
--- a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
+++ b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
@@ -46,6 +46,7 @@ import org.apache.druid.guice.ManageLifecycle;
 import org.apache.druid.guice.annotations.CoordinatorIndexingServiceHelper;
 import org.apache.druid.guice.annotations.EscalatedGlobal;
 import org.apache.druid.guice.http.JettyHttpClientModule;
+import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.java.util.http.client.HttpClient;
@@ -90,7 +91,7 @@ import org.eclipse.jetty.server.Server;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
-import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
 
 /**
  */
@@ -249,11 +250,19 @@ public class CliCoordinator extends ServerRunnable
               ZkPathsConfig zkPaths
           )
           {
+            boolean useHttpLoadQueuePeon = "http".equalsIgnoreCase(config.getLoadQueuePeonType());
+            ExecutorService callBackExec;
+            if (useHttpLoadQueuePeon) {
+              callBackExec = Execs.singleThreaded("LoadQueuePeon-callbackexec--%d");
+            } else {
+              callBackExec = Execs.multiThreaded(config.getNumCuratorCallBackThreads(), "LoadQueuePeon"
+                                                                                        + "-callbackexec--%d");
+            }
             return new LoadQueueTaskMaster(
                 curator,
                 jsonMapper,
                 factory.create(1, "Master-PeonExec--%d"),
-                Executors.newSingleThreadExecutor(),
+                callBackExec,
                 config,
                 httpClient,
                 zkPaths


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org