You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/07/29 02:02:52 UTC

[lucene-solr] 19/27: @436 Fix distrib updates.

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit 776b6bfab1e370030caec8d856a0b46f6d17f183
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Tue Jul 28 16:16:45 2020 -0500

    @436 Fix distrib updates.
---
 .../org/apache/solr/update/SolrCmdDistributor.java |  5 ++-
 .../processor/DistributedZkUpdateProcessor.java    | 30 ++++++++--------
 .../reporters/SolrJmxReporterCloudTest.java        |  1 -
 .../solr/client/solrj/impl/Http2SolrClient.java    | 42 +++++++++++++---------
 4 files changed, 43 insertions(+), 35 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
index 9e7f77e..9c16c1d 100644
--- a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
+++ b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
@@ -76,7 +76,7 @@ public class SolrCmdDistributor implements Closeable {
   };
 
   public SolrCmdDistributor(UpdateShardHandler updateShardHandler) {
-    this.solrClient = updateShardHandler.getUpdateOnlyHttpClient();
+    this.solrClient = new Http2SolrClient.Builder().markInternalRequest().withHttpClient(updateShardHandler.getUpdateOnlyHttpClient()).build();
   }
   
   /* For tests only */
@@ -86,12 +86,11 @@ public class SolrCmdDistributor implements Closeable {
   
   public void finish() {
     assert !finished : "lifecycle sanity check";
-    phaser.arriveAndAwaitAdvance();
     finished = true;
   }
   
   public void close() {
-
+    ParWork.close(solrClient);
   }
 
   public boolean checkRetry(Error err) {
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
index fc8677b..6c6a684 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
@@ -210,7 +210,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
         if (replicaType == Replica.Type.PULL) {
           log.warn("Commit not supported on replicas of type " + Replica.Type.PULL);
         } else if (replicaType == Replica.Type.NRT) {
-          log.info("Do a local commit on NRT endpoint");
+          log.info("Do a local commit on NRT endpoint for replica");
           doLocalCommit(cmd);
         }
       } else {
@@ -235,7 +235,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
         }
         if (isLeader) {
 
-          log.info("Do a local commit on NRT endpoint");
+          log.info("Do a local commit on NRT endpoint for leader");
           doLocalCommit(cmd);
 
           params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString());
@@ -244,6 +244,8 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
 
           useNodes = getReplicaNodesForLeader(cloudDesc.getShardId(), leaderReplica);
 
+          log.info("Found the following replicas to send commit to {}", useNodes);
+
           if (useNodes != null && useNodes.size() > 0) {
             log.info("send commit to replicas nodes={}", useNodes);
 
@@ -253,16 +255,16 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
 
             List<SolrCmdDistributor.Node> finalUseNodes1 = useNodes;
             Future<?> future = ParWork.getExecutor().submit(() -> cmdDistrib.distribCommit(cmd, finalUseNodes1, params));
-            if (useNodes != null && useNodes.size() > 0 && cmd.waitSearcher) {
-              try {
-                future.get();
-              } catch (InterruptedException e) {
-                ParWork.propegateInterrupt(e);
-                throw new SolrException(ErrorCode.SERVER_ERROR, e);
-              } catch (ExecutionException e) {
-                throw new SolrException(ErrorCode.SERVER_ERROR, e);
-              }
-            }
+//            if (useNodes != null && useNodes.size() > 0 && cmd.waitSearcher) {
+//              try {
+//                future.get();
+//              } catch (InterruptedException e) {
+//                ParWork.propegateInterrupt(e);
+//                throw new SolrException(ErrorCode.SERVER_ERROR, e);
+//              } catch (ExecutionException e) {
+//                throw new SolrException(ErrorCode.SERVER_ERROR, e);
+//              }
+//            }
           }
 
         }
@@ -270,9 +272,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
 
       }
 
-      if (log.isDebugEnabled()) {
-        log.debug("processCommit(CommitUpdateCommand) - end");
-      }
+      log.info("processCommit(CommitUpdateCommand) - end");
     }
   }
 
diff --git a/solr/core/src/test/org/apache/solr/metrics/reporters/SolrJmxReporterCloudTest.java b/solr/core/src/test/org/apache/solr/metrics/reporters/SolrJmxReporterCloudTest.java
index 6d4bb68..66fc77f 100644
--- a/solr/core/src/test/org/apache/solr/metrics/reporters/SolrJmxReporterCloudTest.java
+++ b/solr/core/src/test/org/apache/solr/metrics/reporters/SolrJmxReporterCloudTest.java
@@ -70,7 +70,6 @@ public class SolrJmxReporterCloudTest extends SolrCloudTestCase {
 
   @Test
   public void testJmxReporter() throws Exception {
-    CollectionAdminRequest.reloadCollection(COLLECTION).process(cluster.getSolrClient());
     CloudHttp2SolrClient solrClient = cluster.getSolrClient();
     // index some docs
     for (int i = 0; i < 100; i++) {
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
index d6af422..792559f 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
@@ -236,7 +236,7 @@ public class Http2SolrClient extends SolrClient {
       HTTP2Client http2client = new HTTP2Client();
       transport = new HttpClientTransportOverHTTP2(http2client);
       httpClient = new HttpClient(transport, sslContextFactory);
-      httpClient.setMaxConnectionsPerDestination(4);
+      httpClient.setMaxConnectionsPerDestination(10);
     }
     httpClientExecutor = new SolrQueuedThreadPool("httpClient");
     httpClientExecutor.setMaxThreads(Math.max(4 , Runtime.getRuntime().availableProcessors()));
@@ -277,14 +277,6 @@ public class Http2SolrClient extends SolrClient {
         });
       }
       closer.collect(() -> {
-        if (httpClientExecutor != null) {
-          try {
-            httpClientExecutor.prepareToStop();
-          } catch (Exception e) {
-            ParWork.propegateInterrupt(e);
-            throw new RuntimeException(e);
-          }
-        }
         // we wait for async requests, so far devs don't want to give sugar for this
         asyncTracker.waitForCompleteFinal();
         if (httpClientExecutor != null) {
@@ -388,8 +380,17 @@ public class Http2SolrClient extends SolrClient {
     }
 
     decorateRequest(postRequest, updateRequest);
-    InputStreamResponseListener responseListener = new InputStreamResponseListener();
-    asyncTracker.phaser.register();
+    InputStreamResponseListener responseListener = new InputStreamResponseListener() {
+      @Override
+      public void onComplete(Result result) {
+        try {
+          super.onComplete(result);
+        } finally {
+          asyncTracker.completeListener.onComplete(result);
+        }
+      }
+    };
+    asyncTracker.register();
     postRequest.send(responseListener);
 
     boolean isXml = ClientUtils.TEXT_XML.equals(requestWriter.getUpdateContentType());
@@ -434,7 +435,7 @@ public class Http2SolrClient extends SolrClient {
         ? this.parser: solrRequest.getResponseParser();
     if (onComplete != null) {
       // This async call only suitable for indexing since the response size is limited by 5MB
-      asyncTracker.phaser.register();
+      asyncTracker.register();
       req.send(new BufferingResponseListener(5 * 1024 * 1024) {
 
         @Override
@@ -467,11 +468,14 @@ public class Http2SolrClient extends SolrClient {
         InputStreamResponseListener listener = new InputStreamResponseListener() {
           @Override
           public void onComplete(Result result) {
-            super.onComplete(result);
-            asyncTracker.completeListener.onComplete(result);
+            try {
+              super.onComplete(result);
+            } finally {
+              asyncTracker.completeListener.onComplete(result);
+            }
           }
         };
-        asyncTracker.phaser.register();
+        asyncTracker.register();
         req.send(listener);
         Response response = listener.get(idleTimeout, TimeUnit.MILLISECONDS);
         InputStream is = listener.getInputStream();
@@ -880,7 +884,6 @@ public class Http2SolrClient extends SolrClient {
   }
 
   private class AsyncTracker {
-    private final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
     // nocommit - look at outstanding max again
     private static final int MAX_OUTSTANDING_REQUESTS = 1000;
@@ -926,6 +929,13 @@ public class Http2SolrClient extends SolrClient {
 
       if (log.isDebugEnabled()) log.debug("After wait for complete final registered: {} arrived: {}", phaser.getRegisteredParties(), phaser.getArrivedParties());
     }
+
+    public void register() {
+      if (log.isDebugEnabled()) {
+        log.debug("Registered new party");
+      }
+      phaser.register();
+    }
   }
 
   public static class Builder {