You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2019/01/16 03:14:41 UTC

[incubator-druid] Diff for: [GitHub] QiuMM closed pull request #6775: make thread pool size can be configured for loadqueuepeon

diff --git a/docs/content/configuration/index.md b/docs/content/configuration/index.md
index 92942a068cf..6242eced1f2 100644
--- a/docs/content/configuration/index.md
+++ b/docs/content/configuration/index.md
@@ -694,6 +694,7 @@ These coordinator static configurations can be defined in the `coordinator/runti
 |`druid.coordinator.kill.maxSegments`|Kill at most n segments per kill task submission, must be greater than 0. Only applies and MUST be specified if kill is turned on. Note that default value is invalid.|0|
 |`druid.coordinator.balancer.strategy`|Specify the type of balancing strategy that the coordinator should use to distribute segments among the historicals. `cachingCost` is logically equivalent to `cost` but is more CPU-efficient on large clusters and will replace `cost` in the future versions, users are invited to try it. Use `diskNormalized` to distribute segments among nodes so that the disks fill up uniformly and use `random` to randomly pick nodes to distribute segments.|`cost`|
 |`druid.coordinator.loadqueuepeon.repeatDelay`|The start and repeat delay for the loadqueuepeon , which manages the load and drop of segments.|PT0.050S (50 ms)|
+|`druid.coordinator.loadqueuepeon.threadPoolSize`|How many threads to use for the loadqueuepeon to manage the load and drop of segments.|1|
 |`druid.coordinator.asOverlord.enabled`|Boolean value for whether this coordinator node should act like an overlord as well. This configuration allows users to simplify a druid cluster by not having to deploy any standalone overlord nodes. If set to true, then overlord console is available at `http://coordinator-host:port/console.html` and be sure to set `druid.coordinator.asOverlord.overlordService` also. See next.|false|
 |`druid.coordinator.asOverlord.overlordService`| Required, if `druid.coordinator.asOverlord.enabled` is `true`. This must be same value as `druid.service` on standalone Overlord nodes and `druid.selectors.indexing.serviceName` on Middle Managers.|NULL|
 
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CuratorLoadQueuePeon.java b/server/src/main/java/org/apache/druid/server/coordinator/CuratorLoadQueuePeon.java
index e0972260ac9..a0e444f4415 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/CuratorLoadQueuePeon.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/CuratorLoadQueuePeon.java
@@ -223,24 +223,26 @@ public void unmarkSegmentToDrop(DataSegment dataSegment)
 
   private void processSegmentChangeRequest()
   {
-    if (currentlyProcessing != null) {
-      log.debug(
-          "Server[%s] skipping processSegmentChangeRequest because something is currently loading[%s].",
-          basePath,
-          currentlyProcessing.getSegmentIdentifier()
-      );
+    synchronized (lock) {
+      if (currentlyProcessing != null) {
+        log.debug(
+            "Server[%s] skipping processSegmentChangeRequest because something is currently loading[%s].",
+            basePath,
+            currentlyProcessing.getSegmentIdentifier()
+        );
 
-      return;
-    }
+        return;
+      }
 
-    if (!segmentsToDrop.isEmpty()) {
-      currentlyProcessing = segmentsToDrop.firstEntry().getValue();
-      log.debug("Server[%s] dropping [%s]", basePath, currentlyProcessing.getSegmentIdentifier());
-    } else if (!segmentsToLoad.isEmpty()) {
-      currentlyProcessing = segmentsToLoad.firstEntry().getValue();
-      log.debug("Server[%s] loading [%s]", basePath, currentlyProcessing.getSegmentIdentifier());
-    } else {
-      return;
+      if (!segmentsToDrop.isEmpty()) {
+        currentlyProcessing = segmentsToDrop.firstEntry().getValue();
+        log.debug("Server[%s] dropping [%s]", basePath, currentlyProcessing.getSegmentIdentifier());
+      } else if (!segmentsToLoad.isEmpty()) {
+        currentlyProcessing = segmentsToLoad.firstEntry().getValue();
+        log.debug("Server[%s] loading [%s]", basePath, currentlyProcessing.getSegmentIdentifier());
+      } else {
+        return;
+      }
     }
 
     try {
diff --git a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
index 7f36e0cb606..212a752cf5c 100644
--- a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
+++ b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
@@ -233,10 +233,14 @@ public LoadQueueTaskMaster getLoadQueueTaskMaster(
               ZkPathsConfig zkPaths
           )
           {
+            int poolSize = Integer.parseInt(properties.getProperty(
+                "druid.coordinator.loadqueuepeon.threadPoolSize",
+                "1"
+            ));
             return new LoadQueueTaskMaster(
                 curator,
                 jsonMapper,
-                factory.create(1, "Master-PeonExec--%d"),
+                factory.create(poolSize, "Master-PeonExec--%d"),
                 Executors.newSingleThreadExecutor(),
                 config,
                 httpClient,


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org