You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/09/04 11:16:08 UTC
[lucene-solr] 02/13: @721 Convert some more executor usage to
shared execs.
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
commit 7ffb68c0bb6f17b83908304bbc9f2bd8037f0f55
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Thu Sep 3 11:08:31 2020 -0500
@721 Convert some more executor usage to shared execs.
---
.../solr/analytics/stream/AnalyticsShardRequestManager.java | 5 +++--
.../core/src/java/org/apache/solr/handler/IndexFetcher.java | 13 +++++++------
.../solr/handler/component/IterativeMergeStrategy.java | 13 +++++++------
3 files changed, 17 insertions(+), 14 deletions(-)
diff --git a/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/AnalyticsShardRequestManager.java b/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/AnalyticsShardRequestManager.java
index 4ce5eb6..12e7b2c 100644
--- a/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/AnalyticsShardRequestManager.java
+++ b/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/AnalyticsShardRequestManager.java
@@ -37,6 +37,7 @@ import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.CloudSolrClient.Builder;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.common.ParWork;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.Replica;
@@ -142,7 +143,7 @@ public class AnalyticsShardRequestManager {
* @throws IOException if an exception occurs while sending requests.
*/
private void streamFromShards() throws IOException {
- ExecutorService service = ExecutorUtil.newMDCAwareCachedThreadPool(new SolrNamedThreadFactory("SolrAnalyticsStream"));
+ ExecutorService service = ParWork.getRootSharedExecutor();
List<Future<SolrException>> futures = new ArrayList<>();
List<AnalyticsShardRequester> openers = new ArrayList<>();
for (String replicaUrl : replicaUrls) {
@@ -163,11 +164,11 @@ public class AnalyticsShardRequestManager {
}
}
} catch (InterruptedException e1) {
+ ParWork.propegateInterrupt(e1);
throw new RuntimeException(e1);
} catch (ExecutionException e1) {
throw new RuntimeException(e1);
} finally {
- service.shutdown();
for (AnalyticsShardRequester opener : openers) {
opener.close();
}
diff --git a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
index 93d7eeb..fcf59a7 100644
--- a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
+++ b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
@@ -154,6 +154,8 @@ public class IndexFetcher {
private volatile ExecutorService fsyncService;
+ private volatile Future<?> fsyncServiceFuture;
+
private volatile boolean stop = false;
private boolean useInternalCompression = false;
@@ -515,7 +517,7 @@ public class IndexFetcher {
}
// Create the sync service
- fsyncService = ExecutorUtil.newMDCAwareSingleThreadExecutor(new SolrNamedThreadFactory("fsyncService"));
+ fsyncService = ParWork.getRootSharedExecutor();
// use a synchronized list because the list is read by other threads (to show details)
filesDownloaded = Collections.synchronizedList(new ArrayList<Map<String, Object>>());
// if the generation of master is older than that of the slave , it means they are not compatible to be copied
@@ -750,8 +752,8 @@ public class IndexFetcher {
markReplicationStop();
dirFileFetcher = null;
localFileFetcher = null;
- if (fsyncService != null && !fsyncService.isShutdown()) fsyncService.shutdown();
fsyncService = null;
+ fsyncServiceFuture = null;
stop = false;
fsyncException = null;
} finally {
@@ -809,10 +811,9 @@ public class IndexFetcher {
* terminate the fsync service and wait for all the tasks to complete. If it is already terminated
*/
private void terminateAndWaitFsyncService() throws Exception {
- if (fsyncService.isTerminated()) return;
- fsyncService.shutdown();
+ if (fsyncServiceFuture == null) return;
// give a long wait say 1 hr
- fsyncService.awaitTermination(3600, TimeUnit.SECONDS);
+ fsyncServiceFuture.get(3600, TimeUnit.SECONDS);
// if any fsync failed, throw that exception back
Exception fsyncExceptionCopy = fsyncException;
if (fsyncExceptionCopy != null) throw fsyncExceptionCopy;
@@ -1742,7 +1743,7 @@ public class IndexFetcher {
} finally {
cleanup();
//if cleanup succeeds . The file is downloaded fully. do an fsync
- fsyncService.submit(() -> {
+ fsyncServiceFuture = fsyncService.submit(() -> {
try {
file.sync();
} catch (IOException e) {
diff --git a/solr/core/src/java/org/apache/solr/handler/component/IterativeMergeStrategy.java b/solr/core/src/java/org/apache/solr/handler/component/IterativeMergeStrategy.java
index 08fc7fe..698e7d9 100644
--- a/solr/core/src/java/org/apache/solr/handler/component/IterativeMergeStrategy.java
+++ b/solr/core/src/java/org/apache/solr/handler/component/IterativeMergeStrategy.java
@@ -25,6 +25,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
+import org.apache.http.client.HttpClient;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.impl.HttpClientUtil;
@@ -32,6 +33,7 @@ import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient.Builder;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.common.ParWork;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.ExecutorUtil;
@@ -51,7 +53,7 @@ public abstract class IterativeMergeStrategy implements MergeStrategy {
public void merge(ResponseBuilder rb, ShardRequest sreq) {
rb._responseDocs = new SolrDocumentList(); // Null pointers will occur otherwise.
rb.onePassDistributedQuery = true; // Turn off the second pass distributed.
- executorService = ExecutorUtil.newMDCAwareCachedThreadPool(new SolrNamedThreadFactory("IterativeMergeStrategy"));
+ executorService = ParWork.getRootSharedExecutor();
httpClient = getHttpClient();
try {
process(rb, sreq);
@@ -59,7 +61,6 @@ public abstract class IterativeMergeStrategy implements MergeStrategy {
throw new RuntimeException(e);
} finally {
HttpClientUtil.close(httpClient);
- executorService.shutdownNow();
}
}
@@ -79,13 +80,13 @@ public abstract class IterativeMergeStrategy implements MergeStrategy {
}
- public class CallBack implements Callable<CallBack> {
+ public static class CallBack implements Callable<CallBack> {
private HttpSolrClient solrClient;
private QueryRequest req;
private QueryResponse response;
private ShardResponse originalShardResponse;
- public CallBack(ShardResponse originalShardResponse, QueryRequest req) {
+ public CallBack(ShardResponse originalShardResponse, QueryRequest req, HttpClient httpClient) {
this.solrClient = new Builder(originalShardResponse.getShardAddress())
.withHttpClient(httpClient)
@@ -116,13 +117,13 @@ public abstract class IterativeMergeStrategy implements MergeStrategy {
@SuppressWarnings({"unchecked", "rawtypes"})
List<Future<CallBack>> futures = new ArrayList();
for(ShardResponse response : responses) {
- futures.add(this.executorService.submit(new CallBack(response, req)));
+ futures.add(this.executorService.submit(new CallBack(response, req, httpClient)));
}
return futures;
}
public Future<CallBack> callBack(ShardResponse response, QueryRequest req) {
- return this.executorService.submit(new CallBack(response, req));
+ return this.executorService.submit(new CallBack(response, req, httpClient));
}
protected abstract void process(ResponseBuilder rb, ShardRequest sreq) throws Exception;