You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/07/29 02:02:52 UTC
[lucene-solr] 19/27: @436 Fix distrib updates.
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
commit 776b6bfab1e370030caec8d856a0b46f6d17f183
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Tue Jul 28 16:16:45 2020 -0500
@436 Fix distrib updates.
---
.../org/apache/solr/update/SolrCmdDistributor.java | 5 ++-
.../processor/DistributedZkUpdateProcessor.java | 30 ++++++++--------
.../reporters/SolrJmxReporterCloudTest.java | 1 -
.../solr/client/solrj/impl/Http2SolrClient.java | 42 +++++++++++++---------
4 files changed, 43 insertions(+), 35 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
index 9e7f77e..9c16c1d 100644
--- a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
+++ b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
@@ -76,7 +76,7 @@ public class SolrCmdDistributor implements Closeable {
};
public SolrCmdDistributor(UpdateShardHandler updateShardHandler) {
- this.solrClient = updateShardHandler.getUpdateOnlyHttpClient();
+ this.solrClient = new Http2SolrClient.Builder().markInternalRequest().withHttpClient(updateShardHandler.getUpdateOnlyHttpClient()).build();
}
/* For tests only */
@@ -86,12 +86,11 @@ public class SolrCmdDistributor implements Closeable {
public void finish() {
assert !finished : "lifecycle sanity check";
- phaser.arriveAndAwaitAdvance();
finished = true;
}
public void close() {
-
+ ParWork.close(solrClient);
}
public boolean checkRetry(Error err) {
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
index fc8677b..6c6a684 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
@@ -210,7 +210,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
if (replicaType == Replica.Type.PULL) {
log.warn("Commit not supported on replicas of type " + Replica.Type.PULL);
} else if (replicaType == Replica.Type.NRT) {
- log.info("Do a local commit on NRT endpoint");
+ log.info("Do a local commit on NRT endpoint for replica");
doLocalCommit(cmd);
}
} else {
@@ -235,7 +235,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
}
if (isLeader) {
- log.info("Do a local commit on NRT endpoint");
+ log.info("Do a local commit on NRT endpoint for leader");
doLocalCommit(cmd);
params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString());
@@ -244,6 +244,8 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
useNodes = getReplicaNodesForLeader(cloudDesc.getShardId(), leaderReplica);
+ log.info("Found the following replicas to send commit to {}", useNodes);
+
if (useNodes != null && useNodes.size() > 0) {
log.info("send commit to replicas nodes={}", useNodes);
@@ -253,16 +255,16 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
List<SolrCmdDistributor.Node> finalUseNodes1 = useNodes;
Future<?> future = ParWork.getExecutor().submit(() -> cmdDistrib.distribCommit(cmd, finalUseNodes1, params));
- if (useNodes != null && useNodes.size() > 0 && cmd.waitSearcher) {
- try {
- future.get();
- } catch (InterruptedException e) {
- ParWork.propegateInterrupt(e);
- throw new SolrException(ErrorCode.SERVER_ERROR, e);
- } catch (ExecutionException e) {
- throw new SolrException(ErrorCode.SERVER_ERROR, e);
- }
- }
+// if (useNodes != null && useNodes.size() > 0 && cmd.waitSearcher) {
+// try {
+// future.get();
+// } catch (InterruptedException e) {
+// ParWork.propegateInterrupt(e);
+// throw new SolrException(ErrorCode.SERVER_ERROR, e);
+// } catch (ExecutionException e) {
+// throw new SolrException(ErrorCode.SERVER_ERROR, e);
+// }
+// }
}
}
@@ -270,9 +272,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
}
- if (log.isDebugEnabled()) {
- log.debug("processCommit(CommitUpdateCommand) - end");
- }
+ log.info("processCommit(CommitUpdateCommand) - end");
}
}
diff --git a/solr/core/src/test/org/apache/solr/metrics/reporters/SolrJmxReporterCloudTest.java b/solr/core/src/test/org/apache/solr/metrics/reporters/SolrJmxReporterCloudTest.java
index 6d4bb68..66fc77f 100644
--- a/solr/core/src/test/org/apache/solr/metrics/reporters/SolrJmxReporterCloudTest.java
+++ b/solr/core/src/test/org/apache/solr/metrics/reporters/SolrJmxReporterCloudTest.java
@@ -70,7 +70,6 @@ public class SolrJmxReporterCloudTest extends SolrCloudTestCase {
@Test
public void testJmxReporter() throws Exception {
- CollectionAdminRequest.reloadCollection(COLLECTION).process(cluster.getSolrClient());
CloudHttp2SolrClient solrClient = cluster.getSolrClient();
// index some docs
for (int i = 0; i < 100; i++) {
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
index d6af422..792559f 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
@@ -236,7 +236,7 @@ public class Http2SolrClient extends SolrClient {
HTTP2Client http2client = new HTTP2Client();
transport = new HttpClientTransportOverHTTP2(http2client);
httpClient = new HttpClient(transport, sslContextFactory);
- httpClient.setMaxConnectionsPerDestination(4);
+ httpClient.setMaxConnectionsPerDestination(10);
}
httpClientExecutor = new SolrQueuedThreadPool("httpClient");
httpClientExecutor.setMaxThreads(Math.max(4 , Runtime.getRuntime().availableProcessors()));
@@ -277,14 +277,6 @@ public class Http2SolrClient extends SolrClient {
});
}
closer.collect(() -> {
- if (httpClientExecutor != null) {
- try {
- httpClientExecutor.prepareToStop();
- } catch (Exception e) {
- ParWork.propegateInterrupt(e);
- throw new RuntimeException(e);
- }
- }
// we wait for async requests, so far devs don't want to give sugar for this
asyncTracker.waitForCompleteFinal();
if (httpClientExecutor != null) {
@@ -388,8 +380,17 @@ public class Http2SolrClient extends SolrClient {
}
decorateRequest(postRequest, updateRequest);
- InputStreamResponseListener responseListener = new InputStreamResponseListener();
- asyncTracker.phaser.register();
+ InputStreamResponseListener responseListener = new InputStreamResponseListener() {
+ @Override
+ public void onComplete(Result result) {
+ try {
+ super.onComplete(result);
+ } finally {
+ asyncTracker.completeListener.onComplete(result);
+ }
+ }
+ };
+ asyncTracker.register();
postRequest.send(responseListener);
boolean isXml = ClientUtils.TEXT_XML.equals(requestWriter.getUpdateContentType());
@@ -434,7 +435,7 @@ public class Http2SolrClient extends SolrClient {
? this.parser: solrRequest.getResponseParser();
if (onComplete != null) {
// This async call only suitable for indexing since the response size is limited by 5MB
- asyncTracker.phaser.register();
+ asyncTracker.register();
req.send(new BufferingResponseListener(5 * 1024 * 1024) {
@Override
@@ -467,11 +468,14 @@ public class Http2SolrClient extends SolrClient {
InputStreamResponseListener listener = new InputStreamResponseListener() {
@Override
public void onComplete(Result result) {
- super.onComplete(result);
- asyncTracker.completeListener.onComplete(result);
+ try {
+ super.onComplete(result);
+ } finally {
+ asyncTracker.completeListener.onComplete(result);
+ }
}
};
- asyncTracker.phaser.register();
+ asyncTracker.register();
req.send(listener);
Response response = listener.get(idleTimeout, TimeUnit.MILLISECONDS);
InputStream is = listener.getInputStream();
@@ -880,7 +884,6 @@ public class Http2SolrClient extends SolrClient {
}
private class AsyncTracker {
- private final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
// nocommit - look at outstanding max again
private static final int MAX_OUTSTANDING_REQUESTS = 1000;
@@ -926,6 +929,13 @@ public class Http2SolrClient extends SolrClient {
if (log.isDebugEnabled()) log.debug("After wait for complete final registered: {} arrived: {}", phaser.getRegisteredParties(), phaser.getArrivedParties());
}
+
+ public void register() {
+ if (log.isDebugEnabled()) {
+ log.debug("Registered new party");
+ }
+ phaser.register();
+ }
}
public static class Builder {