You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/08/11 09:58:54 UTC
[lucene-solr] 01/02: @502 Executor tweaks.
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
commit f3ca929a16677520a2ec2671c1ca1c3b828553da
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Tue Aug 11 04:46:50 2020 -0500
@502 Executor tweaks.
---
gradle/testing/defaults-tests.gradle | 9 +-
.../apache/solr/cloud/OverseerTaskProcessor.java | 5 +-
.../OverseerCollectionMessageHandler.java | 10 +--
.../handler/component/HttpShardHandlerFactory.java | 4 +-
.../component/SolrExecutorCompletionService.java | 95 ++++++++++++++++++++++
.../processor/DistributedZkUpdateProcessor.java | 2 +-
.../src/java/org/apache/solr/common/ParWork.java | 2 +-
.../org/apache/solr/common/ParWorkExecService.java | 10 +++
.../apache/solr/common/cloud/ZkConfigManager.java | 17 +++-
9 files changed, 131 insertions(+), 23 deletions(-)
diff --git a/gradle/testing/defaults-tests.gradle b/gradle/testing/defaults-tests.gradle
index 6d77123..6f62865 100644
--- a/gradle/testing/defaults-tests.gradle
+++ b/gradle/testing/defaults-tests.gradle
@@ -68,14 +68,7 @@ allprojects {
minHeapSize = propertyOrDefault("tests.minheapsize", "256m")
maxHeapSize = propertyOrDefault("tests.heapsize", "512m")
- int apc;
- if (maxParallelForks > 1) {
- apc = (int) Math.max(1, Runtime.runtime.availableProcessors() / 3.0d);
- } else {
- apc = Runtime.runtime.availableProcessors();
- }
-
- jvmArgs Commandline.translateCommandline(propertyOrDefault("tests.jvmargs", "-XX:TieredStopAtLevel=1 -XX:+UseParallelGC -XX:-UseBiasedLocking -XX:ActiveProcessorCount=" + apc));
+ jvmArgs Commandline.translateCommandline(propertyOrDefault("tests.jvmargs", "-XX:TieredStopAtLevel=1 -XX:+UseParallelGC -XX:-UseBiasedLocking"));
systemProperty 'java.util.logging.config.file', file("${commonDir}/tools/junit4/logging.properties")
systemProperty 'java.awt.headless', 'true'
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
index 2a3d1fe..da36776 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
@@ -21,6 +21,7 @@ import com.google.common.collect.ImmutableSet;
import org.apache.solr.cloud.OverseerTaskQueue.QueueEvent;
import org.apache.solr.common.AlreadyClosedException;
import org.apache.solr.common.ParWork;
+import org.apache.solr.common.ParWorkExecService;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkNodeProps;
@@ -198,7 +199,7 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
while (runningTasksSize() > MAX_PARALLEL_TASKS) {
synchronized (waitLock) {
- waitLock.wait(1000);//wait for 100 ms or till a task is complete
+ waitLock.wait(1000);//wait for 1000 ms or till a task is complete
}
waited = true;
}
@@ -289,7 +290,7 @@ public class OverseerTaskProcessor implements Runnable, Closeable {
.getId() + " message:" + message.toString());
Runner runner = new Runner(messageHandler, message, operation, head,
lock);
- ParWork.getExecutor().submit(runner);
+ ParWork.getExecutor().submit(runner, true);
}
} catch (InterruptedException | AlreadyClosedException e) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
index 526f4cc..7757cdd 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
@@ -380,8 +380,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
private void processReplicaAddPropertyCommand(ClusterState clusterState, ZkNodeProps message, @SuppressWarnings({"rawtypes"})NamedList results)
throws Exception {
checkRequired(message, COLLECTION_PROP, SHARD_ID_PROP, ZkStateReader.NUM_SHARDS_PROP, "shards", REPLICA_PROP, PROPERTY_PROP, PROPERTY_VALUE_PROP);
- SolrZkClient zkClient = zkStateReader.getZkClient();
- Map<String, Object> propMap = new HashMap<>();
+ Map<String, Object> propMap = new HashMap<>(message.getProperties().size() + 1);
propMap.put(Overseer.QUEUE_OPERATION, ADDREPLICAPROP.toLower());
propMap.putAll(message.getProperties());
ZkNodeProps m = new ZkNodeProps(propMap);
@@ -391,8 +390,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
private void processReplicaDeletePropertyCommand(ClusterState clusterState, ZkNodeProps message, @SuppressWarnings({"rawtypes"})NamedList results)
throws Exception {
checkRequired(message, COLLECTION_PROP, SHARD_ID_PROP, REPLICA_PROP, PROPERTY_PROP);
- SolrZkClient zkClient = zkStateReader.getZkClient();
- Map<String, Object> propMap = new HashMap<>();
+ Map<String, Object> propMap = new HashMap<>(message.getProperties().size() + 1);
propMap.put(Overseer.QUEUE_OPERATION, DELETEREPLICAPROP.toLower());
propMap.putAll(message.getProperties());
ZkNodeProps m = new ZkNodeProps(propMap);
@@ -406,7 +404,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
"' parameters are required for the BALANCESHARDUNIQUE operation, no action taken");
}
SolrZkClient zkClient = zkStateReader.getZkClient();
- Map<String, Object> m = new HashMap<>();
+ Map<String, Object> m = new HashMap<>(message.getProperties().size() + 1);
m.put(Overseer.QUEUE_OPERATION, BALANCESHARDUNIQUE.toLower());
m.putAll(message.getProperties());
overseer.offerStateUpdate(Utils.toJSON(m));
@@ -432,7 +430,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
return collection;
} else {
Map<String, Object> shards = (Map<String, Object>) collection.get("shards");
- Map<String, Object> selected = new HashMap<>();
+ Map<String, Object> selected = new HashMap<>(1);
for (String selectedShard : requestedShards) {
if (!shards.containsKey(selectedShard)) {
throw new SolrException(ErrorCode.BAD_REQUEST, "Collection: " + name + " shard: " + selectedShard + " not found");
diff --git a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java
index 3e9c829..34d702f 100644
--- a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java
+++ b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java
@@ -33,6 +33,7 @@ import org.apache.solr.client.solrj.routing.ReplicaListTransformerFactory;
import org.apache.solr.client.solrj.routing.RequestReplicaListTransformerGenerator;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.ParWork;
+import org.apache.solr.common.ParWorkExecService;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.ZkStateReader;
@@ -470,7 +471,8 @@ public class HttpShardHandlerFactory extends ShardHandlerFactory implements org.
* Creates a new completion service for use by a single set of distributed requests.
*/
public CompletionService newCompletionService() {
- return new ExecutorCompletionService<ShardResponse>(ParWork.getExecutor());
+ return new SolrExecutorCompletionService<ShardResponse>(
+ (ParWorkExecService) ParWork.getExecutor());
} // ### expert usage
diff --git a/solr/core/src/java/org/apache/solr/handler/component/SolrExecutorCompletionService.java b/solr/core/src/java/org/apache/solr/handler/component/SolrExecutorCompletionService.java
new file mode 100644
index 0000000..a4c6998
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/handler/component/SolrExecutorCompletionService.java
@@ -0,0 +1,95 @@
+//
+// Source code recreated from a .class file by IntelliJ IDEA
+// (powered by FernFlower decompiler)
+//
+
+package org.apache.solr.handler.component;
+
+import org.apache.solr.common.ParWorkExecService;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.TimeUnit;
+
+public class SolrExecutorCompletionService<V> implements CompletionService<V> {
+ private final ParWorkExecService executor;
+ private final BlockingQueue<Future<V>> completionQueue;
+
+ private RunnableFuture<V> newTaskFor(Callable<V> task) {
+ return (RunnableFuture)new FutureTask(task);
+ }
+
+ private RunnableFuture<V> newTaskFor(Runnable task, V result) {
+ return (RunnableFuture) new FutureTask(task, result);
+ }
+
+ public SolrExecutorCompletionService(ParWorkExecService executor) {
+ if (executor == null) {
+ throw new NullPointerException();
+ } else {
+ this.executor = executor;
+ this.completionQueue = new LinkedBlockingQueue();
+ }
+ }
+
+ public SolrExecutorCompletionService(ParWorkExecService executor, BlockingQueue<Future<V>> completionQueue) {
+ if (executor != null && completionQueue != null) {
+ this.executor = executor;
+ this.completionQueue = completionQueue;
+ } else {
+ throw new NullPointerException();
+ }
+ }
+
+ public Future<V> submit(Callable<V> task) {
+ if (task == null) {
+ throw new NullPointerException();
+ } else {
+ RunnableFuture<V> f = this.newTaskFor(task);
+ this.executor.execute(new SolrExecutorCompletionService.QueueingFuture(f, this.completionQueue));
+ return f;
+ }
+ }
+
+ public Future<V> submit(Runnable task, V result) {
+ if (task == null) {
+ throw new NullPointerException();
+ } else {
+ RunnableFuture<V> f = this.newTaskFor(task, result);
+ this.executor.execute(new SolrExecutorCompletionService.QueueingFuture(f, this.completionQueue), true);
+ return f;
+ }
+ }
+
+ public Future<V> take() throws InterruptedException {
+ return (Future)this.completionQueue.take();
+ }
+
+ public Future<V> poll() {
+ return (Future)this.completionQueue.poll();
+ }
+
+ public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException {
+ return (Future)this.completionQueue.poll(timeout, unit);
+ }
+
+ private static class QueueingFuture<V> extends FutureTask<Void> {
+ private final Future<V> task;
+ private final BlockingQueue<Future<V>> completionQueue;
+
+ QueueingFuture(RunnableFuture<V> task, BlockingQueue<Future<V>> completionQueue) {
+ super(task, null);
+ this.task = task;
+ this.completionQueue = completionQueue;
+ }
+
+ protected void done() {
+ this.completionQueue.add(this.task);
+ }
+ }
+}
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
index bc0dc98..969659b 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
@@ -236,7 +236,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
}
if (isLeader) {
- try (ParWork worker = new ParWork(this)) {
+ try (ParWork worker = new ParWork(this, false, true)) {
worker.collect(() -> {
log.info("Do a local commit on NRT endpoint for leader");
try {
diff --git a/solr/solrj/src/java/org/apache/solr/common/ParWork.java b/solr/solrj/src/java/org/apache/solr/common/ParWork.java
index 8e592ab..259c9ea 100644
--- a/solr/solrj/src/java/org/apache/solr/common/ParWork.java
+++ b/solr/solrj/src/java/org/apache/solr/common/ParWork.java
@@ -625,7 +625,7 @@ public class ParWork implements Closeable {
Integer minThreads;
Integer maxThreads;
minThreads = Integer.getInteger("solr.per_thread_exec.min_threads", 3);
- maxThreads = Integer.getInteger("solr.per_thread_exec.max_threads", Runtime.getRuntime().availableProcessors() / 3);
+ maxThreads = Integer.getInteger("solr.per_thread_exec.max_threads", Runtime.getRuntime().availableProcessors());
exec = getExecutorService(Math.max(minThreads, maxThreads)); // keep alive directly affects how long a worker might
// be stuck in poll without an enqueue on shutdown
THREAD_LOCAL_EXECUTOR.set(exec);
diff --git a/solr/solrj/src/java/org/apache/solr/common/ParWorkExecService.java b/solr/solrj/src/java/org/apache/solr/common/ParWorkExecService.java
index a1ae52b..1baf365 100644
--- a/solr/solrj/src/java/org/apache/solr/common/ParWorkExecService.java
+++ b/solr/solrj/src/java/org/apache/solr/common/ParWorkExecService.java
@@ -283,6 +283,16 @@ public class ParWorkExecService implements ExecutorService {
@Override
public void execute(Runnable runnable) {
+ execute(runnable, false);
+ }
+
+
+ public void execute(Runnable runnable, boolean requiresAnotherThread) {
+ if (requiresAnotherThread) {
+ service.submit(runnable);
+ return;
+ }
+
boolean success = checkLoad();
if (success) {
success = available.tryAcquire();
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkConfigManager.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkConfigManager.java
index c24bd3b..60d4193 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkConfigManager.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkConfigManager.java
@@ -26,8 +26,11 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
+import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
@@ -140,7 +143,7 @@ public class ZkConfigManager {
}
}
- private void copyConfigDirFromZk(String fromZkPath, String toZkPath, Set<String> copiedToZkPaths) throws IOException {
+ private void copyConfigDirFromZk(String fromZkPath, String toZkPath, Set<String> copiedToZkPaths, Map<String,byte[]> dataMap ) throws IOException {
try {
List<String> files = zkClient.getChildren(root + fromZkPath, null, true);
for (String file : files) {
@@ -149,10 +152,10 @@ public class ZkConfigManager {
final String toZkFilePath = toZkPath + "/" + file;
log.info("Copying zk node {}/{} to {}", fromZkPath, file, toZkFilePath);
byte[] data = zkClient.getData(root + fromZkPath + "/" + file, null, null);
- zkClient.makePath(toZkFilePath, data, true);
+ dataMap.put(toZkFilePath, data);
if (copiedToZkPaths != null) copiedToZkPaths.add(toZkFilePath);
} else {
- copyConfigDirFromZk(root + fromZkPath + "/" + file, toZkPath + "/" + file, copiedToZkPaths);
+ copyConfigDirFromZk(root + fromZkPath + "/" + file, toZkPath + "/" + file, copiedToZkPaths, dataMap);
}
}
} catch (KeeperException | InterruptedException e) {
@@ -182,7 +185,13 @@ public class ZkConfigManager {
* @throws IOException if an I/O error occurs
*/
public void copyConfigDir(String fromConfig, String toConfig, Set<String> copiedToZkPaths) throws IOException {
- copyConfigDirFromZk(CONFIGS_ZKNODE + "/" + fromConfig, CONFIGS_ZKNODE + "/" + toConfig, copiedToZkPaths);
+ Map<String,byte[]> dataMap = new HashMap();
+ copyConfigDirFromZk(CONFIGS_ZKNODE + "/" + fromConfig, CONFIGS_ZKNODE + "/" + toConfig, copiedToZkPaths, dataMap);
+ try {
+ zkClient.mkdirs(dataMap, 1);
+ } catch (KeeperException e) {
+ throw new IOException("Error copying nodes from zookeeper path " + fromConfig + " to " + toConfig, e);
+ }
}
// This method is used by configSetUploadTool and CreateTool to resolve the configset directory.