You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/08/11 09:58:54 UTC

[lucene-solr] 01/02: @502 Executor tweaks.

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit f3ca929a16677520a2ec2671c1ca1c3b828553da
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Tue Aug 11 04:46:50 2020 -0500

    @502 Executor tweaks.
---
 gradle/testing/defaults-tests.gradle               |  9 +-
 .../apache/solr/cloud/OverseerTaskProcessor.java   |  5 +-
 .../OverseerCollectionMessageHandler.java          | 10 +--
 .../handler/component/HttpShardHandlerFactory.java |  4 +-
 .../component/SolrExecutorCompletionService.java   | 95 ++++++++++++++++++++++
 .../processor/DistributedZkUpdateProcessor.java    |  2 +-
 .../src/java/org/apache/solr/common/ParWork.java   |  2 +-
 .../org/apache/solr/common/ParWorkExecService.java | 10 +++
 .../apache/solr/common/cloud/ZkConfigManager.java  | 17 +++-
 9 files changed, 131 insertions(+), 23 deletions(-)

diff --git a/gradle/testing/defaults-tests.gradle b/gradle/testing/defaults-tests.gradle
index 6d77123..6f62865 100644
--- a/gradle/testing/defaults-tests.gradle
+++ b/gradle/testing/defaults-tests.gradle
@@ -68,14 +68,7 @@ allprojects {
       minHeapSize = propertyOrDefault("tests.minheapsize", "256m")
       maxHeapSize = propertyOrDefault("tests.heapsize", "512m")
 
-      int apc;
-      if (maxParallelForks > 1) {
-        apc = (int) Math.max(1, Runtime.runtime.availableProcessors() / 3.0d);
-      } else {
-        apc = Runtime.runtime.availableProcessors();
-      }
-
-      jvmArgs Commandline.translateCommandline(propertyOrDefault("tests.jvmargs", "-XX:TieredStopAtLevel=1 -XX:+UseParallelGC -XX:-UseBiasedLocking -XX:ActiveProcessorCount=" + apc));
+      jvmArgs Commandline.translateCommandline(propertyOrDefault("tests.jvmargs", "-XX:TieredStopAtLevel=1 -XX:+UseParallelGC -XX:-UseBiasedLocking"));
 
       systemProperty 'java.util.logging.config.file', file("${commonDir}/tools/junit4/logging.properties")
       systemProperty 'java.awt.headless', 'true'
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
index 2a3d1fe..da36776 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
@@ -21,6 +21,7 @@ import com.google.common.collect.ImmutableSet;
 import org.apache.solr.cloud.OverseerTaskQueue.QueueEvent;
 import org.apache.solr.common.AlreadyClosedException;
 import org.apache.solr.common.ParWork;
+import org.apache.solr.common.ParWorkExecService;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkNodeProps;
@@ -198,7 +199,7 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
 
           while (runningTasksSize() > MAX_PARALLEL_TASKS) {
             synchronized (waitLock) {
-              waitLock.wait(1000);//wait for 100 ms or till a task is complete
+              waitLock.wait(1000);//wait for 1000 ms or till a task is complete
             }
             waited = true;
           }
@@ -289,7 +290,7 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
                     .getId() + " message:" + message.toString());
             Runner runner = new Runner(messageHandler, message, operation, head,
                 lock);
-            ParWork.getExecutor().submit(runner);
+            ParWork.getExecutor().submit(runner, true);
           }
 
         } catch (InterruptedException | AlreadyClosedException e) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
index 526f4cc..7757cdd 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
@@ -380,8 +380,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
   private void processReplicaAddPropertyCommand(ClusterState clusterState, ZkNodeProps message, @SuppressWarnings({"rawtypes"})NamedList results)
       throws Exception {
     checkRequired(message, COLLECTION_PROP, SHARD_ID_PROP, ZkStateReader.NUM_SHARDS_PROP, "shards", REPLICA_PROP, PROPERTY_PROP, PROPERTY_VALUE_PROP);
-    SolrZkClient zkClient = zkStateReader.getZkClient();
-    Map<String, Object> propMap = new HashMap<>();
+    Map<String, Object> propMap = new HashMap<>(message.getProperties().size() + 1);
     propMap.put(Overseer.QUEUE_OPERATION, ADDREPLICAPROP.toLower());
     propMap.putAll(message.getProperties());
     ZkNodeProps m = new ZkNodeProps(propMap);
@@ -391,8 +390,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
   private void processReplicaDeletePropertyCommand(ClusterState clusterState, ZkNodeProps message, @SuppressWarnings({"rawtypes"})NamedList results)
       throws Exception {
     checkRequired(message, COLLECTION_PROP, SHARD_ID_PROP, REPLICA_PROP, PROPERTY_PROP);
-    SolrZkClient zkClient = zkStateReader.getZkClient();
-    Map<String, Object> propMap = new HashMap<>();
+    Map<String, Object> propMap = new HashMap<>(message.getProperties().size() + 1);
     propMap.put(Overseer.QUEUE_OPERATION, DELETEREPLICAPROP.toLower());
     propMap.putAll(message.getProperties());
     ZkNodeProps m = new ZkNodeProps(propMap);
@@ -406,7 +404,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
               "' parameters are required for the BALANCESHARDUNIQUE operation, no action taken");
     }
     SolrZkClient zkClient = zkStateReader.getZkClient();
-    Map<String, Object> m = new HashMap<>();
+    Map<String, Object> m = new HashMap<>(message.getProperties().size() + 1);
     m.put(Overseer.QUEUE_OPERATION, BALANCESHARDUNIQUE.toLower());
     m.putAll(message.getProperties());
     overseer.offerStateUpdate(Utils.toJSON(m));
@@ -432,7 +430,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
       return collection;
     } else {
       Map<String, Object> shards = (Map<String, Object>) collection.get("shards");
-      Map<String, Object>  selected = new HashMap<>();
+      Map<String, Object>  selected = new HashMap<>(1);
       for (String selectedShard : requestedShards) {
         if (!shards.containsKey(selectedShard)) {
           throw new SolrException(ErrorCode.BAD_REQUEST, "Collection: " + name + " shard: " + selectedShard + " not found");
diff --git a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java
index 3e9c829..34d702f 100644
--- a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java
+++ b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java
@@ -33,6 +33,7 @@ import org.apache.solr.client.solrj.routing.ReplicaListTransformerFactory;
 import org.apache.solr.client.solrj.routing.RequestReplicaListTransformerGenerator;
 import org.apache.solr.cloud.ZkController;
 import org.apache.solr.common.ParWork;
+import org.apache.solr.common.ParWorkExecService;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.ZkStateReader;
@@ -470,7 +471,8 @@ public class HttpShardHandlerFactory extends ShardHandlerFactory implements org.
    * Creates a new completion service for use by a single set of distributed requests.
    */
   public CompletionService newCompletionService() {
-    return new ExecutorCompletionService<ShardResponse>(ParWork.getExecutor());
+    return new SolrExecutorCompletionService<ShardResponse>(
+        (ParWorkExecService) ParWork.getExecutor());
   } // ### expert usage
 
 
diff --git a/solr/core/src/java/org/apache/solr/handler/component/SolrExecutorCompletionService.java b/solr/core/src/java/org/apache/solr/handler/component/SolrExecutorCompletionService.java
new file mode 100644
index 0000000..a4c6998
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/handler/component/SolrExecutorCompletionService.java
@@ -0,0 +1,95 @@
+//
+// Source code recreated from a .class file by IntelliJ IDEA
+// (powered by FernFlower decompiler)
+//
+
+package org.apache.solr.handler.component;
+
+import org.apache.solr.common.ParWorkExecService;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.TimeUnit;
+
+public class SolrExecutorCompletionService<V> implements CompletionService<V> {
+  private final ParWorkExecService executor;
+  private final BlockingQueue<Future<V>> completionQueue;
+
+  private RunnableFuture<V> newTaskFor(Callable<V> task) {
+    return (RunnableFuture)new FutureTask(task);
+  }
+
+  private RunnableFuture<V> newTaskFor(Runnable task, V result) {
+    return (RunnableFuture) new FutureTask(task, result);
+  }
+
+  public SolrExecutorCompletionService(ParWorkExecService executor) {
+    if (executor == null) {
+      throw new NullPointerException();
+    } else {
+      this.executor = executor;
+      this.completionQueue = new LinkedBlockingQueue();
+    }
+  }
+
+  public SolrExecutorCompletionService(ParWorkExecService executor, BlockingQueue<Future<V>> completionQueue) {
+    if (executor != null && completionQueue != null) {
+      this.executor = executor;
+      this.completionQueue = completionQueue;
+    } else {
+      throw new NullPointerException();
+    }
+  }
+
+  public Future<V> submit(Callable<V> task) {
+    if (task == null) {
+      throw new NullPointerException();
+    } else {
+      RunnableFuture<V> f = this.newTaskFor(task);
+      this.executor.execute(new SolrExecutorCompletionService.QueueingFuture(f, this.completionQueue));
+      return f;
+    }
+  }
+
+  public Future<V> submit(Runnable task, V result) {
+    if (task == null) {
+      throw new NullPointerException();
+    } else {
+      RunnableFuture<V> f = this.newTaskFor(task, result);
+      this.executor.execute(new SolrExecutorCompletionService.QueueingFuture(f, this.completionQueue), true);
+      return f;
+    }
+  }
+
+  public Future<V> take() throws InterruptedException {
+    return (Future)this.completionQueue.take();
+  }
+
+  public Future<V> poll() {
+    return (Future)this.completionQueue.poll();
+  }
+
+  public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException {
+    return (Future)this.completionQueue.poll(timeout, unit);
+  }
+
+  private static class QueueingFuture<V> extends FutureTask<Void> {
+    private final Future<V> task;
+    private final BlockingQueue<Future<V>> completionQueue;
+
+    QueueingFuture(RunnableFuture<V> task, BlockingQueue<Future<V>> completionQueue) {
+      super(task, null);
+      this.task = task;
+      this.completionQueue = completionQueue;
+    }
+
+    protected void done() {
+      this.completionQueue.add(this.task);
+    }
+  }
+}
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
index bc0dc98..969659b 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
@@ -236,7 +236,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
         }
         if (isLeader) {
 
-          try (ParWork worker = new ParWork(this)) {
+          try (ParWork worker = new ParWork(this, false, true)) {
             worker.collect(() -> {
               log.info("Do a local commit on NRT endpoint for leader");
               try {
diff --git a/solr/solrj/src/java/org/apache/solr/common/ParWork.java b/solr/solrj/src/java/org/apache/solr/common/ParWork.java
index 8e592ab..259c9ea 100644
--- a/solr/solrj/src/java/org/apache/solr/common/ParWork.java
+++ b/solr/solrj/src/java/org/apache/solr/common/ParWork.java
@@ -625,7 +625,7 @@ public class ParWork implements Closeable {
       Integer minThreads;
       Integer maxThreads;
       minThreads = Integer.getInteger("solr.per_thread_exec.min_threads", 3);
-      maxThreads = Integer.getInteger("solr.per_thread_exec.max_threads",  Runtime.getRuntime().availableProcessors() / 3);
+      maxThreads = Integer.getInteger("solr.per_thread_exec.max_threads",  Runtime.getRuntime().availableProcessors());
       exec = getExecutorService(Math.max(minThreads, maxThreads)); // keep alive directly affects how long a worker might
       // be stuck in poll without an enqueue on shutdown
       THREAD_LOCAL_EXECUTOR.set(exec);
diff --git a/solr/solrj/src/java/org/apache/solr/common/ParWorkExecService.java b/solr/solrj/src/java/org/apache/solr/common/ParWorkExecService.java
index a1ae52b..1baf365 100644
--- a/solr/solrj/src/java/org/apache/solr/common/ParWorkExecService.java
+++ b/solr/solrj/src/java/org/apache/solr/common/ParWorkExecService.java
@@ -283,6 +283,16 @@ public class ParWorkExecService implements ExecutorService {
 
   @Override
   public void execute(Runnable runnable) {
+    execute(runnable, false);
+  }
+
+
+  public void execute(Runnable runnable, boolean requiresAnotherThread) {
+    if (requiresAnotherThread) {
+       service.submit(runnable);
+       return;
+    }
+
     boolean success = checkLoad();
     if (success) {
       success = available.tryAcquire();
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkConfigManager.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkConfigManager.java
index c24bd3b..60d4193 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkConfigManager.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkConfigManager.java
@@ -26,8 +26,11 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Locale;
+import java.util.Map;
 import java.util.Set;
 import java.util.regex.Pattern;
 
@@ -140,7 +143,7 @@ public class ZkConfigManager {
     }
   }
 
-  private void copyConfigDirFromZk(String fromZkPath, String toZkPath, Set<String> copiedToZkPaths) throws IOException {
+  private void copyConfigDirFromZk(String fromZkPath, String toZkPath, Set<String> copiedToZkPaths,  Map<String,byte[]> dataMap ) throws IOException {
     try {
       List<String> files = zkClient.getChildren(root + fromZkPath, null, true);
       for (String file : files) {
@@ -149,10 +152,10 @@ public class ZkConfigManager {
           final String toZkFilePath = toZkPath + "/" + file;
           log.info("Copying zk node {}/{} to {}", fromZkPath, file, toZkFilePath);
           byte[] data = zkClient.getData(root + fromZkPath + "/" + file, null, null);
-          zkClient.makePath(toZkFilePath, data, true);
+          dataMap.put(toZkFilePath, data);
           if (copiedToZkPaths != null) copiedToZkPaths.add(toZkFilePath);
         } else {
-          copyConfigDirFromZk(root + fromZkPath + "/" + file, toZkPath + "/" + file, copiedToZkPaths);
+          copyConfigDirFromZk(root + fromZkPath + "/" + file, toZkPath + "/" + file, copiedToZkPaths, dataMap);
         }
       }
     } catch (KeeperException | InterruptedException e) {
@@ -182,7 +185,13 @@ public class ZkConfigManager {
    * @throws IOException if an I/O error occurs
    */
   public void copyConfigDir(String fromConfig, String toConfig, Set<String> copiedToZkPaths) throws IOException {
-    copyConfigDirFromZk(CONFIGS_ZKNODE + "/" + fromConfig, CONFIGS_ZKNODE + "/" + toConfig, copiedToZkPaths);
+    Map<String,byte[]> dataMap = new HashMap();
+    copyConfigDirFromZk(CONFIGS_ZKNODE + "/" + fromConfig, CONFIGS_ZKNODE + "/" + toConfig, copiedToZkPaths, dataMap);
+    try {
+      zkClient.mkdirs(dataMap, 1);
+    } catch (KeeperException e) {
+      throw new IOException("Error copying nodes from zookeeper path " + fromConfig + " to " + toConfig, e);
+    }
   }
 
   // This method is used by configSetUploadTool and CreateTool to resolve the configset directory.