You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/07/28 21:17:32 UTC

[lucene-solr] branch reference_impl_dev updated (a1a06fc -> 6a2e4ef)

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a change to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git.


 discard a1a06fc  @436 Fix distrib updates.
     new 6a2e4ef  @436 Fix distrib updates.

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (a1a06fc)
            \
             N -- N -- N   refs/heads/reference_impl_dev (6a2e4ef)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/solr/update/SolrCmdDistributor.java |  6 ++--
 .../processor/DistributedZkUpdateProcessor.java    | 30 ++++++++--------
 .../reporters/SolrJmxReporterCloudTest.java        |  1 -
 .../solr/client/solrj/impl/Http2SolrClient.java    | 42 +++++++++++++---------
 4 files changed, 43 insertions(+), 36 deletions(-)


[lucene-solr] 01/01: @436 Fix distrib updates.

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit 6a2e4ef6529a1cfa682ee93f3bf0457f786c45f3
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Tue Jul 28 16:16:45 2020 -0500

    @436 Fix distrib updates.
---
 .../org/apache/solr/update/SolrCmdDistributor.java |  5 ++-
 .../processor/DistributedZkUpdateProcessor.java    | 30 ++++++++--------
 .../reporters/SolrJmxReporterCloudTest.java        |  1 -
 .../solr/client/solrj/impl/Http2SolrClient.java    | 42 +++++++++++++---------
 4 files changed, 43 insertions(+), 35 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
index 9e7f77e..9c16c1d 100644
--- a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
+++ b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
@@ -76,7 +76,7 @@ public class SolrCmdDistributor implements Closeable {
   };
 
   public SolrCmdDistributor(UpdateShardHandler updateShardHandler) {
-    this.solrClient = updateShardHandler.getUpdateOnlyHttpClient();
+    this.solrClient = new Http2SolrClient.Builder().markInternalRequest().withHttpClient(updateShardHandler.getUpdateOnlyHttpClient()).build();
   }
   
   /* For tests only */
@@ -86,12 +86,11 @@ public class SolrCmdDistributor implements Closeable {
   
   public void finish() {
     assert !finished : "lifecycle sanity check";
-    phaser.arriveAndAwaitAdvance();
     finished = true;
   }
   
   public void close() {
-
+    ParWork.close(solrClient);
   }
 
   public boolean checkRetry(Error err) {
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
index fc8677b..6c6a684 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
@@ -210,7 +210,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
         if (replicaType == Replica.Type.PULL) {
           log.warn("Commit not supported on replicas of type " + Replica.Type.PULL);
         } else if (replicaType == Replica.Type.NRT) {
-          log.info("Do a local commit on NRT endpoint");
+          log.info("Do a local commit on NRT endpoint for replica");
           doLocalCommit(cmd);
         }
       } else {
@@ -235,7 +235,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
         }
         if (isLeader) {
 
-          log.info("Do a local commit on NRT endpoint");
+          log.info("Do a local commit on NRT endpoint for leader");
           doLocalCommit(cmd);
 
           params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString());
@@ -244,6 +244,8 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
 
           useNodes = getReplicaNodesForLeader(cloudDesc.getShardId(), leaderReplica);
 
+          log.info("Found the following replicas to send commit to {}", useNodes);
+
           if (useNodes != null && useNodes.size() > 0) {
             log.info("send commit to replicas nodes={}", useNodes);
 
@@ -253,16 +255,16 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
 
             List<SolrCmdDistributor.Node> finalUseNodes1 = useNodes;
             Future<?> future = ParWork.getExecutor().submit(() -> cmdDistrib.distribCommit(cmd, finalUseNodes1, params));
-            if (useNodes != null && useNodes.size() > 0 && cmd.waitSearcher) {
-              try {
-                future.get();
-              } catch (InterruptedException e) {
-                ParWork.propegateInterrupt(e);
-                throw new SolrException(ErrorCode.SERVER_ERROR, e);
-              } catch (ExecutionException e) {
-                throw new SolrException(ErrorCode.SERVER_ERROR, e);
-              }
-            }
+//            if (useNodes != null && useNodes.size() > 0 && cmd.waitSearcher) {
+//              try {
+//                future.get();
+//              } catch (InterruptedException e) {
+//                ParWork.propegateInterrupt(e);
+//                throw new SolrException(ErrorCode.SERVER_ERROR, e);
+//              } catch (ExecutionException e) {
+//                throw new SolrException(ErrorCode.SERVER_ERROR, e);
+//              }
+//            }
           }
 
         }
@@ -270,9 +272,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
 
       }
 
-      if (log.isDebugEnabled()) {
-        log.debug("processCommit(CommitUpdateCommand) - end");
-      }
+      log.info("processCommit(CommitUpdateCommand) - end");
     }
   }
 
diff --git a/solr/core/src/test/org/apache/solr/metrics/reporters/SolrJmxReporterCloudTest.java b/solr/core/src/test/org/apache/solr/metrics/reporters/SolrJmxReporterCloudTest.java
index 6d4bb68..66fc77f 100644
--- a/solr/core/src/test/org/apache/solr/metrics/reporters/SolrJmxReporterCloudTest.java
+++ b/solr/core/src/test/org/apache/solr/metrics/reporters/SolrJmxReporterCloudTest.java
@@ -70,7 +70,6 @@ public class SolrJmxReporterCloudTest extends SolrCloudTestCase {
 
   @Test
   public void testJmxReporter() throws Exception {
-    CollectionAdminRequest.reloadCollection(COLLECTION).process(cluster.getSolrClient());
     CloudHttp2SolrClient solrClient = cluster.getSolrClient();
     // index some docs
     for (int i = 0; i < 100; i++) {
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
index d6af422..792559f 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
@@ -236,7 +236,7 @@ public class Http2SolrClient extends SolrClient {
       HTTP2Client http2client = new HTTP2Client();
       transport = new HttpClientTransportOverHTTP2(http2client);
       httpClient = new HttpClient(transport, sslContextFactory);
-      httpClient.setMaxConnectionsPerDestination(4);
+      httpClient.setMaxConnectionsPerDestination(10);
     }
     httpClientExecutor = new SolrQueuedThreadPool("httpClient");
     httpClientExecutor.setMaxThreads(Math.max(4 , Runtime.getRuntime().availableProcessors()));
@@ -277,14 +277,6 @@ public class Http2SolrClient extends SolrClient {
         });
       }
       closer.collect(() -> {
-        if (httpClientExecutor != null) {
-          try {
-            httpClientExecutor.prepareToStop();
-          } catch (Exception e) {
-            ParWork.propegateInterrupt(e);
-            throw new RuntimeException(e);
-          }
-        }
         // we wait for async requests, so far devs don't want to give sugar for this
         asyncTracker.waitForCompleteFinal();
         if (httpClientExecutor != null) {
@@ -388,8 +380,17 @@ public class Http2SolrClient extends SolrClient {
     }
 
     decorateRequest(postRequest, updateRequest);
-    InputStreamResponseListener responseListener = new InputStreamResponseListener();
-    asyncTracker.phaser.register();
+    InputStreamResponseListener responseListener = new InputStreamResponseListener() {
+      @Override
+      public void onComplete(Result result) {
+        try {
+          super.onComplete(result);
+        } finally {
+          asyncTracker.completeListener.onComplete(result);
+        }
+      }
+    };
+    asyncTracker.register();
     postRequest.send(responseListener);
 
     boolean isXml = ClientUtils.TEXT_XML.equals(requestWriter.getUpdateContentType());
@@ -434,7 +435,7 @@ public class Http2SolrClient extends SolrClient {
         ? this.parser: solrRequest.getResponseParser();
     if (onComplete != null) {
       // This async call only suitable for indexing since the response size is limited by 5MB
-      asyncTracker.phaser.register();
+      asyncTracker.register();
       req.send(new BufferingResponseListener(5 * 1024 * 1024) {
 
         @Override
@@ -467,11 +468,14 @@ public class Http2SolrClient extends SolrClient {
         InputStreamResponseListener listener = new InputStreamResponseListener() {
           @Override
           public void onComplete(Result result) {
-            super.onComplete(result);
-            asyncTracker.completeListener.onComplete(result);
+            try {
+              super.onComplete(result);
+            } finally {
+              asyncTracker.completeListener.onComplete(result);
+            }
           }
         };
-        asyncTracker.phaser.register();
+        asyncTracker.register();
         req.send(listener);
         Response response = listener.get(idleTimeout, TimeUnit.MILLISECONDS);
         InputStream is = listener.getInputStream();
@@ -880,7 +884,6 @@ public class Http2SolrClient extends SolrClient {
   }
 
   private class AsyncTracker {
-    private final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
     // nocommit - look at outstanding max again
     private static final int MAX_OUTSTANDING_REQUESTS = 1000;
@@ -926,6 +929,13 @@ public class Http2SolrClient extends SolrClient {
 
       if (log.isDebugEnabled()) log.debug("After wait for complete final registered: {} arrived: {}", phaser.getRegisteredParties(), phaser.getArrivedParties());
     }
+
+    public void register() {
+      if (log.isDebugEnabled()) {
+        log.debug("Registered new party");
+      }
+      phaser.register();
+    }
   }
 
   public static class Builder {