You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/09/04 11:16:08 UTC

[lucene-solr] 02/13: @721 Convert some more executor usage to shared execs.

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit 7ffb68c0bb6f17b83908304bbc9f2bd8037f0f55
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Thu Sep 3 11:08:31 2020 -0500

    @721 Convert some more executor usage to shared execs.
---
 .../solr/analytics/stream/AnalyticsShardRequestManager.java |  5 +++--
 .../core/src/java/org/apache/solr/handler/IndexFetcher.java | 13 +++++++------
 .../solr/handler/component/IterativeMergeStrategy.java      | 13 +++++++------
 3 files changed, 17 insertions(+), 14 deletions(-)

diff --git a/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/AnalyticsShardRequestManager.java b/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/AnalyticsShardRequestManager.java
index 4ce5eb6..12e7b2c 100644
--- a/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/AnalyticsShardRequestManager.java
+++ b/solr/contrib/analytics/src/java/org/apache/solr/analytics/stream/AnalyticsShardRequestManager.java
@@ -37,6 +37,7 @@ import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.apache.solr.client.solrj.impl.CloudSolrClient.Builder;
 import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.common.ParWork;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.Replica;
@@ -142,7 +143,7 @@ public class AnalyticsShardRequestManager {
    * @throws IOException if an exception occurs while sending requests.
    */
   private void streamFromShards() throws IOException {
-    ExecutorService service = ExecutorUtil.newMDCAwareCachedThreadPool(new SolrNamedThreadFactory("SolrAnalyticsStream"));
+    ExecutorService service = ParWork.getRootSharedExecutor();
     List<Future<SolrException>> futures = new ArrayList<>();
     List<AnalyticsShardRequester> openers = new ArrayList<>();
     for (String replicaUrl : replicaUrls) {
@@ -163,11 +164,11 @@ public class AnalyticsShardRequestManager {
         }
       }
     } catch (InterruptedException e1) {
+      ParWork.propegateInterrupt(e1);
       throw new RuntimeException(e1);
     } catch (ExecutionException e1) {
       throw new RuntimeException(e1);
     } finally {
-      service.shutdown();
       for (AnalyticsShardRequester opener : openers) {
         opener.close();
       }
diff --git a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
index 93d7eeb..fcf59a7 100644
--- a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
+++ b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
@@ -154,6 +154,8 @@ public class IndexFetcher {
 
   private volatile ExecutorService fsyncService;
 
+  private volatile Future<?> fsyncServiceFuture;
+
   private volatile boolean stop = false;
 
   private boolean useInternalCompression = false;
@@ -515,7 +517,7 @@ public class IndexFetcher {
       }
 
       // Create the sync service
-      fsyncService = ExecutorUtil.newMDCAwareSingleThreadExecutor(new SolrNamedThreadFactory("fsyncService"));
+      fsyncService = ParWork.getRootSharedExecutor();
       // use a synchronized list because the list is read by other threads (to show details)
       filesDownloaded = Collections.synchronizedList(new ArrayList<Map<String, Object>>());
       // if the generation of master is older than that of the slave , it means they are not compatible to be copied
@@ -750,8 +752,8 @@ public class IndexFetcher {
       markReplicationStop();
       dirFileFetcher = null;
       localFileFetcher = null;
-      if (fsyncService != null && !fsyncService.isShutdown()) fsyncService.shutdown();
       fsyncService = null;
+      fsyncServiceFuture = null;
       stop = false;
       fsyncException = null;
     } finally {
@@ -809,10 +811,9 @@ public class IndexFetcher {
    * terminate the fsync service and wait for all the tasks to complete. If it is already terminated
    */
   private void terminateAndWaitFsyncService() throws Exception {
-    if (fsyncService.isTerminated()) return;
-    fsyncService.shutdown();
+    if (fsyncServiceFuture == null) return;
      // give a long wait say 1 hr
-    fsyncService.awaitTermination(3600, TimeUnit.SECONDS);
+    fsyncServiceFuture.get(3600, TimeUnit.SECONDS);
     // if any fsync failed, throw that exception back
     Exception fsyncExceptionCopy = fsyncException;
     if (fsyncExceptionCopy != null) throw fsyncExceptionCopy;
@@ -1742,7 +1743,7 @@ public class IndexFetcher {
       } finally {
         cleanup();
         //if cleanup succeeds . The file is downloaded fully. do an fsync
-        fsyncService.submit(() -> {
+        fsyncServiceFuture = fsyncService.submit(() -> {
           try {
             file.sync();
           } catch (IOException e) {
diff --git a/solr/core/src/java/org/apache/solr/handler/component/IterativeMergeStrategy.java b/solr/core/src/java/org/apache/solr/handler/component/IterativeMergeStrategy.java
index 08fc7fe..698e7d9 100644
--- a/solr/core/src/java/org/apache/solr/handler/component/IterativeMergeStrategy.java
+++ b/solr/core/src/java/org/apache/solr/handler/component/IterativeMergeStrategy.java
@@ -25,6 +25,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 
+import org.apache.http.client.HttpClient;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.impl.HttpClientUtil;
@@ -32,6 +33,7 @@ import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.client.solrj.impl.HttpSolrClient.Builder;
 import org.apache.solr.client.solrj.request.QueryRequest;
 import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.common.ParWork;
 import org.apache.solr.common.SolrDocumentList;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.util.ExecutorUtil;
@@ -51,7 +53,7 @@ public abstract class IterativeMergeStrategy implements MergeStrategy  {
   public void merge(ResponseBuilder rb, ShardRequest sreq) {
     rb._responseDocs = new SolrDocumentList(); // Null pointers will occur otherwise.
     rb.onePassDistributedQuery = true;   // Turn off the second pass distributed.
-    executorService = ExecutorUtil.newMDCAwareCachedThreadPool(new SolrNamedThreadFactory("IterativeMergeStrategy"));
+    executorService = ParWork.getRootSharedExecutor();
     httpClient = getHttpClient();
     try {
       process(rb, sreq);
@@ -59,7 +61,6 @@ public abstract class IterativeMergeStrategy implements MergeStrategy  {
       throw new RuntimeException(e);
     } finally {
       HttpClientUtil.close(httpClient);
-      executorService.shutdownNow();
     }
   }
 
@@ -79,13 +80,13 @@ public abstract class IterativeMergeStrategy implements MergeStrategy  {
 
   }
 
-  public class CallBack implements Callable<CallBack> {
+  public static class CallBack implements Callable<CallBack> {
     private HttpSolrClient solrClient;
     private QueryRequest req;
     private QueryResponse response;
     private ShardResponse originalShardResponse;
 
-    public CallBack(ShardResponse originalShardResponse, QueryRequest req) {
+    public CallBack(ShardResponse originalShardResponse, QueryRequest req, HttpClient httpClient) {
 
       this.solrClient = new Builder(originalShardResponse.getShardAddress())
           .withHttpClient(httpClient)
@@ -116,13 +117,13 @@ public abstract class IterativeMergeStrategy implements MergeStrategy  {
     @SuppressWarnings({"unchecked", "rawtypes"})
     List<Future<CallBack>> futures = new ArrayList();
     for(ShardResponse response : responses) {
-      futures.add(this.executorService.submit(new CallBack(response, req)));
+      futures.add(this.executorService.submit(new CallBack(response, req, httpClient)));
     }
     return futures;
   }
 
   public Future<CallBack> callBack(ShardResponse response, QueryRequest req) {
-    return this.executorService.submit(new CallBack(response, req));
+    return this.executorService.submit(new CallBack(response, req, httpClient));
   }
 
   protected abstract void process(ResponseBuilder rb, ShardRequest sreq) throws Exception;