You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by da...@apache.org on 2020/04/30 08:52:34 UTC
[lucene-solr] branch jira/SOLR-14354 updated: SOLR-14354: Async or
using threads in better way for HttpShardHandler
This is an automated email from the ASF dual-hosted git repository.
datcm pushed a commit to branch jira/SOLR-14354
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/jira/SOLR-14354 by this push:
new 4d6894b SOLR-14354: Async or using threads in better way for HttpShardHandler
new 7d06eea Merge branch 'jira/SOLR-14354' of github.com:apache/lucene-solr into jira/SOLR-14354
4d6894b is described below
commit 4d6894bca3b477dde2cbfdf68007784136e7c360
Author: Cao Manh Dat <da...@apache.org>
AuthorDate: Thu Apr 30 15:49:50 2020 +0700
SOLR-14354: Async or using threads in better way for HttpShardHandler
---
.../solr/handler/component/HttpShardHandler.java | 158 +++++++++----------
.../handler/component/HttpShardHandlerFactory.java | 22 +--
.../solr/handler/component/QueryComponent.java | 2 +-
.../apache/solr/update/MockingHttp2SolrClient.java | 20 +--
.../solr/client/solrj/impl/Http2SolrClient.java | 170 +++++++++++++--------
.../solr/client/solrj/impl/LBHttp2SolrClient.java | 142 +++++++++++++++++
.../solr/client/solrj/impl/LBSolrClient.java | 98 +++++++++++-
.../apache/solr/client/solrj/util/Cancellable.java | 22 +++
.../solr/client/solrj/impl/LBSolrClientTest.java | 93 +++++++++++
9 files changed, 543 insertions(+), 184 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java
index f23cf16..1fcb317 100644
--- a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java
@@ -17,17 +17,14 @@
package org.apache.solr.handler.component;
import java.io.IOException;
-import java.net.ConnectException;
import java.util.HashMap;
-import java.util.HashSet;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CompletionService;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import io.opentracing.Span;
import io.opentracing.Tracer;
@@ -36,9 +33,11 @@ import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.Http2SolrClient;
+import org.apache.solr.client.solrj.impl.LBHttp2SolrClient;
import org.apache.solr.client.solrj.impl.LBSolrClient;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.routing.ReplicaListTransformer;
+import org.apache.solr.client.solrj.util.Cancellable;
import org.apache.solr.cloud.CloudDescriptor;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrException;
@@ -54,7 +53,6 @@ import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.request.SolrRequestInfo;
import org.apache.solr.util.tracing.GlobalTracer;
import org.apache.solr.util.tracing.SolrRequestCarrier;
-import org.slf4j.MDC;
public class HttpShardHandler extends ShardHandler {
/**
@@ -64,18 +62,23 @@ public class HttpShardHandler extends ShardHandler {
* by the RealtimeGet handler, since other types of replicas shouldn't respond to RTG requests
*/
public static String ONLY_NRT_REPLICAS = "distribOnlyRealtime";
+ private static final ShardResponse END_QUEUE = new ShardResponse();
private HttpShardHandlerFactory httpShardHandlerFactory;
- private CompletionService<ShardResponse> completionService;
- private Set<Future<ShardResponse>> pending;
+ private LinkedList<Cancellable> requests;
+ private BlockingQueue<ShardResponse> responses;
+ private AtomicInteger pending;
private Map<String, List<String>> shardToURLs;
private Http2SolrClient httpClient;
+ private LBHttp2SolrClient lbClient;
public HttpShardHandler(HttpShardHandlerFactory httpShardHandlerFactory, Http2SolrClient httpClient) {
this.httpClient = httpClient;
this.httpShardHandlerFactory = httpShardHandlerFactory;
- completionService = httpShardHandlerFactory.newCompletionService();
- pending = new HashSet<>();
+ this.lbClient = httpShardHandlerFactory.loadbalancer;
+ this.pending = new AtomicInteger(0);
+ this.responses = new LinkedBlockingQueue<>();
+ this.requests = new LinkedList<>();
// maps "localhost:8983|localhost:7574" to a shuffled List("http://localhost:8983","http://localhost:7574")
// This is primarily to keep track of what order we should use to query the replicas of a shard
@@ -130,77 +133,64 @@ public class HttpShardHandler extends ShardHandler {
final Tracer tracer = GlobalTracer.getTracer();
final Span span = tracer != null ? tracer.activeSpan() : null;
- Callable<ShardResponse> task = () -> {
+ params.remove(CommonParams.WT); // use default (currently javabin)
+ params.remove(CommonParams.VERSION);
+ QueryRequest req = makeQueryRequest(sreq, params, shard);
+ req.setMethod(SolrRequest.METHOD.POST);
- ShardResponse srsp = new ShardResponse();
- if (sreq.nodeName != null) {
- srsp.setNodeName(sreq.nodeName);
- }
- srsp.setShardRequest(sreq);
- srsp.setShard(shard);
- SimpleSolrResponse ssr = new SimpleSolrResponse();
- srsp.setSolrResponse(ssr);
- long startTime = System.nanoTime();
+ LBSolrClient.Req lbReq = httpShardHandlerFactory.newLBHttpSolrClientReq(req, urls);
+
+ ShardResponse srsp = new ShardResponse();
+ if (sreq.nodeName != null) {
+ srsp.setNodeName(sreq.nodeName);
+ }
+ srsp.setShardRequest(sreq);
+ srsp.setShard(shard);
+ SimpleSolrResponse ssr = new SimpleSolrResponse();
+ srsp.setSolrResponse(ssr);
+
+ pending.incrementAndGet();
+ // if there are no shards available for a slice, urls.size()==0
+ if (urls.size() == 0) {
+ // TODO: what's the right error code here? We should use the same thing when
+ // all of the servers for a shard are down.
+ SolrException exception = new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, "no servers hosting shard: " + shard);
+ srsp.setException(exception);
+ srsp.setResponseCode(exception.code());
+ responses.add(srsp);
+ return;
+ }
- try {
- params.remove(CommonParams.WT); // use default (currently javabin)
- params.remove(CommonParams.VERSION);
+ requests.add(this.lbClient.asyncReq(lbReq, new LBHttp2SolrClient.OnComplete() {
+ long startTime = System.nanoTime();
- QueryRequest req = makeQueryRequest(sreq, params, shard);
+ @Override
+ public void onStart() {
if (tracer != null && span != null) {
tracer.inject(span.context(), Format.Builtin.HTTP_HEADERS, new SolrRequestCarrier(req));
}
- req.setMethod(SolrRequest.METHOD.POST);
SolrRequestInfo requestInfo = SolrRequestInfo.getRequestInfo();
if (requestInfo != null) req.setUserPrincipal(requestInfo.getReq().getUserPrincipal());
-
- // no need to set the response parser as binary is the defaultJab
- // req.setResponseParser(new BinaryResponseParser());
-
- // if there are no shards available for a slice, urls.size()==0
- if (urls.size() == 0) {
- // TODO: what's the right error code here? We should use the same thing when
- // all of the servers for a shard are down.
- throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, "no servers hosting shard: " + shard);
- }
-
- if (urls.size() <= 1) {
- String url = urls.get(0);
- srsp.setShardAddress(url);
- ssr.nl = request(url, req);
- } else {
- LBSolrClient.Rsp rsp = httpShardHandlerFactory.makeLoadBalancedRequest(req, urls);
- ssr.nl = rsp.getResponse();
- srsp.setShardAddress(rsp.getServer());
- }
- } catch (ConnectException cex) {
- srsp.setException(cex); //????
- } catch (Exception th) {
- srsp.setException(th);
- if (th instanceof SolrException) {
- srsp.setResponseCode(((SolrException) th).code());
- } else {
- srsp.setResponseCode(-1);
- }
}
- ssr.elapsedTime = TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
-
- return transfomResponse(sreq, srsp, shard);
- };
-
- try {
- if (shard != null) {
- MDC.put("ShardRequest.shards", shard);
+ @Override
+ public void onComplete(LBSolrClient.Rsp rsp) {
+ ssr.nl = rsp.getResponse();
+ srsp.setShardAddress(rsp.getServer());
+ ssr.elapsedTime = TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
+ responses.add(srsp);
}
- if (urls != null && !urls.isEmpty()) {
- MDC.put("ShardRequest.urlList", urls.toString());
+
+ @Override
+ public void onError(Throwable throwable) {
+ ssr.elapsedTime = TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
+ srsp.setException(throwable);
+ if (throwable instanceof SolrException) {
+ srsp.setResponseCode(((SolrException) throwable).code());
+ }
+ responses.add(srsp);
}
- pending.add(completionService.submit(task));
- } finally {
- MDC.remove("ShardRequest.shards");
- MDC.remove("ShardRequest.urlList");
- }
+ }));
}
protected NamedList<Object> request(String url, SolrRequest req) throws IOException, SolrServerException {
@@ -243,12 +233,13 @@ public class HttpShardHandler extends ShardHandler {
}
private ShardResponse take(boolean bailOnError) {
+ try {
+ while (pending.get() > 0) {
+ ShardResponse rsp = responses.take();
+ if (rsp == END_QUEUE)
+ continue;
- while (pending.size() > 0) {
- try {
- Future<ShardResponse> future = completionService.take();
- pending.remove(future);
- ShardResponse rsp = future.get();
+ pending.decrementAndGet();
if (bailOnError && rsp.getException() != null) return rsp; // if exception, return immediately
// add response to the response list... we do this after the take() and
// not after the completion of "call" so we know when the last response
@@ -258,13 +249,9 @@ public class HttpShardHandler extends ShardHandler {
if (rsp.getShardRequest().responses.size() == rsp.getShardRequest().actualShards.length) {
return rsp;
}
- } catch (InterruptedException e) {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
- } catch (ExecutionException e) {
- // should be impossible... the problem with catching the exception
- // at this level is we don't know what ShardRequest it applied to
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Impossible Exception", e);
}
+ } catch (InterruptedException e) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
}
return null;
}
@@ -272,9 +259,12 @@ public class HttpShardHandler extends ShardHandler {
@Override
public void cancelAll() {
- for (Future<ShardResponse> future : pending) {
- future.cancel(false);
+ for (Cancellable cancellable : requests) {
+ cancellable.cancel();
+ pending.decrementAndGet();
}
+ // ensure that we do not hang on responses.take()
+ responses.add(END_QUEUE);
}
@Override
diff --git a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java
index 1617dcb..83af91d 100644
--- a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java
+++ b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java
@@ -96,7 +96,7 @@ public class HttpShardHandlerFactory extends ShardHandlerFactory implements org.
protected volatile Http2SolrClient defaultClient;
protected InstrumentedHttpListenerFactory httpListenerFactory;
- private LBHttp2SolrClient loadbalancer;
+ protected LBHttp2SolrClient loadbalancer;
int corePoolSize = 0;
int maximumPoolSize = Integer.MAX_VALUE;
@@ -316,6 +316,7 @@ public class HttpShardHandlerFactory extends ShardHandlerFactory implements org.
this.defaultClient = new Http2SolrClient.Builder()
.connectionTimeout(connectionTimeout)
.idleTimeout(soTimeout)
+ .withExecutor(commExecutor)
.maxConnectionsPerHost(maxConnectionsPerHost).build();
this.defaultClient.addListenerFactory(this.httpListenerFactory);
this.loadbalancer = new LBHttp2SolrClient(defaultClient);
@@ -368,18 +369,6 @@ public class HttpShardHandlerFactory extends ShardHandlerFactory implements org.
return solrMetricsContext;
}
- /**
- * Makes a request to one or more of the given urls, using the configured load balancer.
- *
- * @param req The solr search request that should be sent through the load balancer
- * @param urls The list of solr server urls to load balance across
- * @return The response from the request
- */
- public LBSolrClient.Rsp makeLoadBalancedRequest(final QueryRequest req, List<String> urls)
- throws SolrServerException, IOException {
- return loadbalancer.request(newLBHttpSolrClientReq(req, urls));
- }
-
protected LBSolrClient.Req newLBHttpSolrClientReq(final QueryRequest req, List<String> urls) {
int numServersToTry = (int)Math.floor(urls.size() * this.permittedLoadBalancerRequestsMaximumFraction);
if (numServersToTry < this.permittedLoadBalancerRequestsMinimumAbsolute) {
@@ -425,13 +414,6 @@ public class HttpShardHandlerFactory extends ShardHandlerFactory implements org.
}
/**
- * Creates a new completion service for use by a single set of distributed requests.
- */
- public CompletionService<ShardResponse> newCompletionService() {
- return new ExecutorCompletionService<>(commExecutor);
- }
-
- /**
* Rebuilds the URL replacing the URL scheme of the passed URL with the
* configured scheme replacement.If no scheme was configured, the passed URL's
* scheme is left alone.
diff --git a/solr/core/src/java/org/apache/solr/handler/component/QueryComponent.java b/solr/core/src/java/org/apache/solr/handler/component/QueryComponent.java
index b03997a..debc41b 100644
--- a/solr/core/src/java/org/apache/solr/handler/component/QueryComponent.java
+++ b/solr/core/src/java/org/apache/solr/handler/component/QueryComponent.java
@@ -853,7 +853,7 @@ public class QueryComponent extends SearchComponent
if (srsp.getException() != null) {
Throwable t = srsp.getException();
if(t instanceof SolrServerException) {
- t = ((SolrServerException)t).getCause();
+ t = t.getCause();
}
nl.add("error", t.toString() );
StringWriter trace = new StringWriter();
diff --git a/solr/core/src/test/org/apache/solr/update/MockingHttp2SolrClient.java b/solr/core/src/test/org/apache/solr/update/MockingHttp2SolrClient.java
index c9aded7..1a69511 100644
--- a/solr/core/src/test/org/apache/solr/update/MockingHttp2SolrClient.java
+++ b/solr/core/src/test/org/apache/solr/update/MockingHttp2SolrClient.java
@@ -28,6 +28,7 @@ import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.Http2SolrClient;
import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.util.Cancellable;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.util.NamedList;
@@ -118,20 +119,20 @@ public class MockingHttp2SolrClient extends Http2SolrClient {
return super.request(request, collection);
}
- public NamedList<Object> request(SolrRequest request, String collection, OnComplete onComplete)
- throws SolrServerException, IOException {
+ @Override
+ public Cancellable asyncRequest(SolrRequest request, String collection, OnComplete onComplete) {
if (request instanceof UpdateRequest) {
UpdateRequest ur = (UpdateRequest) request;
// won't throw exception if request is DBQ
if (ur.getDeleteQuery() != null && !ur.getDeleteQuery().isEmpty()) {
- return super.request(request, collection, onComplete);
+ return super.asyncRequest(request, collection, onComplete);
}
}
if (exp != null) {
if (oneExpPerReq) {
if (reqGotException.contains(request)) {
- return super.request(request, collection, onComplete);
+ return super.asyncRequest(request, collection, onComplete);
}
else
reqGotException.add(request);
@@ -140,17 +141,12 @@ public class MockingHttp2SolrClient extends Http2SolrClient {
Exception e = exception();
if (e instanceof IOException) {
if (LuceneTestCase.random().nextBoolean()) {
- throw (IOException) e;
- } else {
- throw new SolrServerException(e);
+ e = new SolrServerException(e);
}
- } else if (e instanceof SolrServerException) {
- throw (SolrServerException) e;
- } else {
- throw new SolrServerException(e);
}
+ onComplete.onFailure(e);
}
- return super.request(request, collection, onComplete);
+ return super.asyncRequest(request, collection, onComplete);
}
}
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
index 6a08816..4fbcd9f 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
@@ -34,27 +34,35 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
import java.util.concurrent.Phaser;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
import org.apache.commons.io.IOUtils;
import org.apache.http.HttpStatus;
import org.apache.http.entity.ContentType;
import org.apache.solr.client.solrj.ResponseParser;
import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.V2RequestSupport;
import org.apache.solr.client.solrj.embedded.SSLConfig;
+import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.request.RequestWriter;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.request.V2Request;
+import org.apache.solr.client.solrj.response.DelegationTokenResponse;
+import org.apache.solr.client.solrj.util.Cancellable;
import org.apache.solr.client.solrj.util.ClientUtils;
import org.apache.solr.client.solrj.util.Constants;
import org.apache.solr.common.SolrException;
@@ -133,6 +141,7 @@ public class Http2SolrClient extends SolrClient {
*/
private String serverBaseUrl;
private boolean closeClient;
+ private ExecutorService executor;
protected Http2SolrClient(String serverBaseUrl, Builder builder) {
if (serverBaseUrl != null) {
@@ -176,8 +185,14 @@ public class Http2SolrClient extends SolrClient {
HttpClient httpClient;
BlockingArrayQueue<Runnable> queue = new BlockingArrayQueue<>(256, 256);
- ThreadPoolExecutor httpClientExecutor = new ExecutorUtil.MDCAwareThreadPoolExecutor(32,
- 256, 60, TimeUnit.SECONDS, queue, new SolrNamedThreadFactory("h2sc"));
+ Executor executor = builder.executor;
+ if (executor == null) {
+ this.executor = new ExecutorUtil.MDCAwareThreadPoolExecutor(32,
+ 256, 60, TimeUnit.SECONDS, queue, new SolrNamedThreadFactory("h2sc"));
+ executor = this.executor;
+ }
+ this.executor = Objects.requireNonNullElseGet(builder.executor, () -> new ExecutorUtil.MDCAwareThreadPoolExecutor(32,
+ 256, 60, TimeUnit.SECONDS, queue, new SolrNamedThreadFactory("h2sc")));
SslContextFactory.Client sslContextFactory;
boolean ssl;
@@ -208,7 +223,7 @@ public class Http2SolrClient extends SolrClient {
httpClient.setMaxConnectionsPerDestination(4);
}
- httpClient.setExecutor(httpClientExecutor);
+ httpClient.setExecutor(this.executor);
httpClient.setStrictEventOrdering(false);
httpClient.setConnectBlocking(true);
httpClient.setFollowRedirects(false);
@@ -230,7 +245,6 @@ public class Http2SolrClient extends SolrClient {
asyncTracker.waitForComplete();
if (closeClient) {
try {
- ExecutorService executor = (ExecutorService) httpClient.getExecutor();
httpClient.setStopTimeout(1000);
httpClient.stop();
ExecutorUtil.shutdownAndAwaitTermination(executor);
@@ -359,65 +373,95 @@ public class Http2SolrClient extends SolrClient {
outStream.flush();
}
- public NamedList<Object> request(SolrRequest solrRequest,
- String collection,
- OnComplete onComplete) throws IOException, SolrServerException {
- Request req = makeRequest(solrRequest, collection);
+ private static final Exception CANCELLED_EXCEPTION = new Exception();
+
+ public Cancellable asyncRequest(SolrRequest solrRequest, String collection, OnComplete onComplete) {
+ Request req;
+ try {
+ req = makeRequest(solrRequest, collection);
+ } catch (SolrServerException | IOException e) {
+ onComplete.onFailure(e);
+ return () -> {};
+ }
final ResponseParser parser = solrRequest.getResponseParser() == null
? this.parser: solrRequest.getResponseParser();
-
- if (onComplete != null) {
- // This async call only suitable for indexing since the response size is limited by 5MB
- req.onRequestQueued(asyncTracker.queuedListener)
- .onComplete(asyncTracker.completeListener).send(new BufferingResponseListener(5 * 1024 * 1024) {
-
- @Override
- public void onComplete(Result result) {
- if (result.isFailed()) {
- onComplete.onFailure(result.getFailure());
- return;
+ req.onRequestQueued(asyncTracker.queuedListener)
+ .onComplete(asyncTracker.completeListener)
+ .send(new InputStreamResponseListener() {
+ @Override
+ public void onHeaders(Response response) {
+ super.onHeaders(response);
+ InputStreamResponseListener listener = this;
+ executor.execute(() -> {
+ InputStream is = listener.getInputStream();
+ assert ObjectReleaseTracker.track(is);
+ try {
+ NamedList<Object> body = processErrorsAndResponse(response, parser, is, getEncoding(response), isV2ApiRequest(solrRequest));
+ onComplete.onSuccess(body);
+ } catch (RemoteSolrException e) {
+ if (SolrException.getRootCause(e) != CANCELLED_EXCEPTION) {
+ onComplete.onFailure(e);
+ }
+ } catch (SolrServerException e) {
+ onComplete.onFailure(e);
+ }
+ });
}
- NamedList<Object> rsp;
- try {
- InputStream is = getContentAsInputStream();
- assert ObjectReleaseTracker.track(is);
- rsp = processErrorsAndResponse(result.getResponse(),
- parser, is, getEncoding(), isV2ApiRequest(solrRequest));
- onComplete.onSuccess(rsp);
- } catch (Exception e) {
- onComplete.onFailure(e);
+ @Override
+ public void onFailure(Response response, Throwable failure) {
+ super.onFailure(response, failure);
+ if (failure != CANCELLED_EXCEPTION) {
+ onComplete.onFailure(createException(req, failure));
+ }
}
- }
- });
- return null;
- } else {
- try {
- InputStreamResponseListener listener = new InputStreamResponseListener();
- req.send(listener);
- Response response = listener.get(idleTimeout, TimeUnit.MILLISECONDS);
- InputStream is = listener.getInputStream();
- assert ObjectReleaseTracker.track(is);
- return processErrorsAndResponse(response, parser, is, getEncoding(response), isV2ApiRequest(solrRequest));
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException(e);
- } catch (TimeoutException e) {
- throw new SolrServerException(
- "Timeout occured while waiting response from server at: " + req.getURI(), e);
- } catch (ExecutionException e) {
- Throwable cause = e.getCause();
- if (cause instanceof ConnectException) {
- throw new SolrServerException("Server refused connection at: " + req.getURI(), cause);
- }
- if (cause instanceof SolrServerException) {
- throw (SolrServerException) cause;
- } else if (cause instanceof IOException) {
- throw new SolrServerException(
- "IOException occured when talking to server at: " + getBaseURL(), cause);
- }
- throw new SolrServerException(cause.getMessage(), cause);
+ });
+ return () -> req.abort(CANCELLED_EXCEPTION);
+ }
+
+ @Override
+ public NamedList<Object> request(SolrRequest solrRequest, String collection) throws SolrServerException, IOException {
+ Request req = makeRequest(solrRequest, collection);
+ final ResponseParser parser = solrRequest.getResponseParser() == null
+ ? this.parser: solrRequest.getResponseParser();
+
+ try {
+ InputStreamResponseListener listener = new InputStreamResponseListener();
+ req.send(listener);
+ Response response = listener.get(idleTimeout, TimeUnit.MILLISECONDS);
+ InputStream is = listener.getInputStream();
+ assert ObjectReleaseTracker.track(is);
+ return processErrorsAndResponse(response, parser, is, getEncoding(response), isV2ApiRequest(solrRequest));
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ } catch (RuntimeException e) {
+ throw e;
+ } catch (Throwable e) {
+ throw createException(req, e);
+ }
+ }
+
+ private SolrServerException createException(Request req, Throwable throwable) {
+ try {
+ throw throwable;
+ } catch (TimeoutException e) {
+ return new SolrServerException(
+ "Timeout occured while waiting response from server at: " + req.getURI(), e);
+ } catch (ExecutionException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof ConnectException) {
+ return new SolrServerException("Server refused connection at: " + req.getURI(), cause);
+ }
+ if (cause instanceof SolrServerException) {
+ return (SolrServerException) cause;
+ } else if (cause instanceof IOException) {
+ return new SolrServerException(
+ "IOException occured when talking to server at: " + getBaseURL(), cause);
}
+ return new SolrServerException(cause.getMessage(), cause);
+ } catch (Throwable e) {
+ return new SolrServerException(e.getMessage(), e);
}
}
@@ -459,6 +503,7 @@ public class Http2SolrClient extends SolrClient {
private void decorateRequest(Request req, SolrRequest solrRequest) {
req.header(HttpHeader.ACCEPT_ENCODING, null);
+ req.timeout(idleTimeout, TimeUnit.MILLISECONDS);
if (solrRequest.getUserPrincipal() != null) {
req.attribute(REQ_PRINCIPAL_KEY, solrRequest.getUserPrincipal());
}
@@ -748,11 +793,6 @@ public class Http2SolrClient extends SolrClient {
}
}
- @Override
- public NamedList<Object> request(SolrRequest request, String collection) throws SolrServerException, IOException {
- return request(request, collection, null);
- }
-
public void setRequestWriter(RequestWriter requestWriter) {
this.requestWriter = requestWriter;
}
@@ -819,6 +859,7 @@ public class Http2SolrClient extends SolrClient {
private Integer maxConnectionsPerHost;
private boolean useHttp1_1 = Boolean.getBoolean("solr.http1");
protected String baseSolrUrl;
+ private ExecutorService executor;
public Builder() {
@@ -840,6 +881,11 @@ public class Http2SolrClient extends SolrClient {
return this;
}
+ public Builder withExecutor(ExecutorService executor) {
+ this.executor = executor;
+ return this;
+ }
+
public Builder withSSLConfig(SSLConfig sslConfig) {
this.sslConfig = sslConfig;
return this;
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttp2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttp2SolrClient.java
index 293a264..19280e0 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttp2SolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttp2SolrClient.java
@@ -16,9 +16,23 @@
*/
package org.apache.solr.client.solrj.impl;
+import java.io.IOException;
+import java.net.ConnectException;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.request.IsUpdateRequest;
+import org.apache.solr.client.solrj.util.Cancellable;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.util.NamedList;
+import org.slf4j.MDC;
+
+import static org.apache.solr.common.params.CommonParams.ADMIN_PATHS;
/**
* LBHttp2SolrClient or "LoadBalanced LBHttp2SolrClient" is a load balancing wrapper around
@@ -66,4 +80,132 @@ public class LBHttp2SolrClient extends LBSolrClient {
protected SolrClient getClient(String baseUrl) {
return httpClient;
}
+
+ public interface OnComplete {
+ void onStart();
+ void onComplete(Rsp rsp);
+ void onError(Throwable throwable);
+ }
+
+ public Cancellable asyncReq(Req req, OnComplete onComplete) {
+ Rsp rsp = new Rsp();
+ boolean isNonRetryable = req.request instanceof IsUpdateRequest || ADMIN_PATHS.contains(req.request.getPath());
+ ServerIterator it = new ServerIterator(req, zombieServers);
+ onComplete.onStart();
+ final AtomicBoolean cancelled = new AtomicBoolean(false);
+ AtomicReference<Cancellable> currentCancellable = new AtomicReference<>();
+ RetryListener retryListener = new RetryListener() {
+
+ @Override
+ public void onSuccess(Rsp rsp) {
+ onComplete.onComplete(rsp);
+ }
+
+ @Override
+ public void onFailure(Throwable e, boolean retryReq) {
+ if (retryReq) {
+ String url;
+ try {
+ url = it.nextOrError();
+ } catch (SolrServerException ex) {
+ onComplete.onError(e);
+ return;
+ }
+ try {
+ MDC.put("LBSolrClient.url", url);
+ synchronized (cancelled) {
+ if (cancelled.get()) {
+ return;
+ }
+ Cancellable cancellable = doRequest(url, req, rsp, isNonRetryable, it.isServingZombieServer(), this);
+ currentCancellable.set(cancellable);
+ }
+ } finally {
+ MDC.remove("LBSolrClient.url");
+ }
+ } else {
+ onComplete.onError(e);
+ }
+ }
+ };
+ try {
+ Cancellable cancellable = doRequest(it.nextOrError(), req, rsp, isNonRetryable, it.isServingZombieServer(), retryListener);
+ currentCancellable.set(cancellable);
+ } catch (SolrServerException e) {
+ onComplete.onError(e);
+ }
+ return () -> {
+ synchronized (cancelled) {
+ cancelled.set(true);
+ if (currentCancellable.get() != null) {
+ currentCancellable.get().cancel();
+ }
+ }
+ };
+ }
+
+ private interface RetryListener {
+ void onSuccess(Rsp rsp);
+ void onFailure(Throwable e, boolean retryReq);
+ }
+
+ private Cancellable doRequest(String baseUrl, Req req, Rsp rsp, boolean isNonRetryable,
+ boolean isZombie, RetryListener listener) {
+ rsp.server = baseUrl;
+ req.getRequest().setBasePath(baseUrl);
+ return ((Http2SolrClient)getClient(baseUrl)).asyncRequest(req.getRequest(), null, new Http2SolrClient.OnComplete() {
+ @Override
+ public void onSuccess(NamedList<Object> result) {
+ rsp.rsp = result;
+ if (isZombie) {
+ zombieServers.remove(baseUrl);
+ }
+ listener.onSuccess(rsp);
+ }
+
+ @Override
+ public void onFailure(Throwable oe) {
+ try {
+ throw (Exception) oe;
+ } catch (BaseHttpSolrClient.RemoteExecutionException e){
+ listener.onFailure(e, false);
+ } catch(SolrException e) {
+ // we retry on 404 or 403 or 503 or 500
+ // unless it's an update - then we only retry on connect exception
+ if (!isNonRetryable && RETRY_CODES.contains(e.code())) {
+ listener.onFailure((!isZombie) ? addZombie(baseUrl, e) : e, true);
+ } else {
+ // Server is alive but the request was likely malformed or invalid
+ if (isZombie) {
+ zombieServers.remove(baseUrl);
+ }
+ listener.onFailure(e, false);
+ }
+ } catch (SocketException e) {
+ if (!isNonRetryable || e instanceof ConnectException) {
+ listener.onFailure((!isZombie) ? addZombie(baseUrl, e) : e, true);
+ } else {
+ listener.onFailure(e, false);
+ }
+ } catch (SocketTimeoutException e) {
+ if (!isNonRetryable) {
+ listener.onFailure((!isZombie) ? addZombie(baseUrl, e) : e, true);
+ } else {
+ listener.onFailure(e, false);
+ }
+ } catch (SolrServerException e) {
+ Throwable rootCause = e.getRootCause();
+ if (!isNonRetryable && rootCause instanceof IOException) {
+ listener.onFailure((!isZombie) ? addZombie(baseUrl, e) : e, true);
+ } else if (isNonRetryable && rootCause instanceof ConnectException) {
+ listener.onFailure((!isZombie) ? addZombie(baseUrl, e) : e, true);
+ } else {
+ listener.onFailure(e, false);
+ }
+ } catch (Exception e) {
+ listener.onFailure(new SolrServerException(e), false);
+ }
+ }
+ });
+ }
}
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBSolrClient.java
index 176f07d..6d5bf8c 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBSolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBSolrClient.java
@@ -24,10 +24,12 @@ import java.net.MalformedURLException;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.net.URL;
+import java.rmi.Remote;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -37,6 +39,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
import org.apache.solr.client.solrj.ResponseParser;
import org.apache.solr.client.solrj.SolrClient;
@@ -60,7 +63,7 @@ import static org.apache.solr.common.params.CommonParams.ADMIN_PATHS;
public abstract class LBSolrClient extends SolrClient {
// defaults
- private static final Set<Integer> RETRY_CODES = new HashSet<>(Arrays.asList(404, 403, 503, 500));
+ protected static final Set<Integer> RETRY_CODES = new HashSet<>(Arrays.asList(404, 403, 503, 500));
private static final int CHECK_INTERVAL = 60 * 1000; //1 minute between checks
private static final int NONSTANDARD_PING_LIMIT = 5; // number of times we'll ping dead servers not in the server list
@@ -69,7 +72,7 @@ public abstract class LBSolrClient extends SolrClient {
private final Map<String, ServerWrapper> aliveServers = new LinkedHashMap<>();
// access to aliveServers should be synchronized on itself
- private final Map<String, ServerWrapper> zombieServers = new ConcurrentHashMap<>();
+ protected final Map<String, ServerWrapper> zombieServers = new ConcurrentHashMap<>();
// changes to aliveServers are reflected in this array, no need to synchronize
private volatile ServerWrapper[] aliveServerList = new ServerWrapper[0];
@@ -136,6 +139,91 @@ public abstract class LBSolrClient extends SolrClient {
}
}
+ protected static class ServerIterator {
+ String serverStr;
+ List<String> skipped;
+ int numServersTried;
+ Iterator<String> it;
+ Iterator<String> skippedIt;
+ String exceptionMessage;
+ long timeAllowedNano;
+ long timeOutTime;
+
+ final Map<String, ServerWrapper> zombieServers;
+ final Req req;
+
+ public ServerIterator(Req req, Map<String, ServerWrapper> zombieServers) {
+ this.it = req.getServers().iterator();
+ this.req = req;
+ this.zombieServers = zombieServers;
+ this.timeAllowedNano = getTimeAllowedInNanos(req.getRequest());
+ this.timeOutTime = System.nanoTime() + timeAllowedNano;
+ fetchNext();
+ }
+
+ public synchronized boolean hasNext() {
+ return serverStr != null;
+ }
+
+ private void fetchNext() {
+ serverStr = null;
+ if (req.numServersToTry != null && numServersTried > req.numServersToTry) {
+ exceptionMessage = "Time allowed to handle this request exceeded";
+ return;
+ }
+
+ while (it.hasNext()) {
+ serverStr = it.next();
+ serverStr = normalize(serverStr);
+ // if the server is currently a zombie, just skip to the next one
+ ServerWrapper wrapper = zombieServers.get(serverStr);
+ if (wrapper != null) {
+ final int numDeadServersToTry = req.getNumDeadServersToTry();
+ if (numDeadServersToTry > 0) {
+ if (skipped == null) {
+ skipped = new ArrayList<>(numDeadServersToTry);
+ skipped.add(wrapper.getBaseUrl());
+ } else if (skipped.size() < numDeadServersToTry) {
+ skipped.add(wrapper.getBaseUrl());
+ }
+ }
+ continue;
+ }
+
+ break;
+ }
+ if (serverStr == null && skipped != null) {
+ if (skippedIt == null) {
+ skippedIt = skipped.iterator();
+ }
+ if (skippedIt.hasNext()) {
+ serverStr = skippedIt.next();
+ }
+ }
+ }
+
+ boolean isServingZombieServer() {
+ return skippedIt != null;
+ }
+
+ public synchronized String nextOrError() throws SolrServerException {
+ if (isTimeExceeded(timeAllowedNano, timeOutTime)) {
+ throw new SolrServerException("Time allowed to handle this request exceeded");
+ }
+ if (serverStr == null) {
+ throw new SolrServerException("No live SolrServers available to handle this request");
+ }
+ numServersTried++;
+ if (req.getNumServersToTry() != null && numServersTried > req.getNumServersToTry()) {
+ throw new SolrServerException("No live SolrServers available to handle this request:"
+ + " numServersTried="+numServersTried
+ + " numServersToTry="+req.getNumServersToTry());
+ }
+ String rs = serverStr;
+ fetchNext();
+ return rs;
+ }
+ }
public static class Req {
protected SolrRequest request;
@@ -349,13 +437,13 @@ public abstract class LBSolrClient extends SolrClient {
/**
* @return time allowed in nanos, returns -1 if no time_allowed is specified.
*/
- private long getTimeAllowedInNanos(final SolrRequest req) {
+ private static long getTimeAllowedInNanos(final SolrRequest req) {
SolrParams reqParams = req.getParams();
return reqParams == null ? -1 :
TimeUnit.NANOSECONDS.convert(reqParams.getInt(CommonParams.TIME_ALLOWED, -1), TimeUnit.MILLISECONDS);
}
- private boolean isTimeExceeded(long timeAllowedNano, long timeOutTime) {
+ private static boolean isTimeExceeded(long timeAllowedNano, long timeOutTime) {
return timeAllowedNano > 0 && System.nanoTime() > timeOutTime;
}
@@ -413,7 +501,7 @@ public abstract class LBSolrClient extends SolrClient {
protected abstract SolrClient getClient(String baseUrl);
- private Exception addZombie(String serverStr, Exception e) {
+ protected Exception addZombie(String serverStr, Exception e) {
ServerWrapper wrapper = createServerWrapper(serverStr);
wrapper.standard = false;
zombieServers.put(serverStr, wrapper);
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/util/Cancellable.java b/solr/solrj/src/java/org/apache/solr/client/solrj/util/Cancellable.java
new file mode 100644
index 0000000..323916a
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/util/Cancellable.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.client.solrj.util;
+
+public interface Cancellable {
+ void cancel();
+}
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/LBSolrClientTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/LBSolrClientTest.java
new file mode 100644
index 0000000..1455294
--- /dev/null
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/LBSolrClientTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.client.solrj.impl;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.apache.lucene.util.LuceneTestCase;
+import org.apache.lucene.util.RoaringDocIdSet;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class LBSolrClientTest {
+
+ @Test
+ public void testServerIterator() throws SolrServerException {
+ LBSolrClient.Req req = new LBSolrClient.Req(new QueryRequest(), Arrays.asList("1", "2", "3", "4"));
+ LBSolrClient.ServerIterator serverIterator = new LBSolrClient.ServerIterator(req, new HashMap<>());
+ List<String> actualServers = new ArrayList<>();
+ while (serverIterator.hasNext()) {
+ actualServers.add(serverIterator.nextOrError());
+ }
+ assertEquals(Arrays.asList("1", "2", "3", "4"), actualServers);
+ assertFalse(serverIterator.hasNext());
+ LuceneTestCase.expectThrows(SolrServerException.class, serverIterator::nextOrError);
+ }
+
+ @Test
+ public void testServerIteratorWithZombieServers() throws SolrServerException {
+ HashMap<String, LBSolrClient.ServerWrapper> zombieServers = new HashMap<>();
+ LBSolrClient.Req req = new LBSolrClient.Req(new QueryRequest(), Arrays.asList("1", "2", "3", "4"));
+ LBSolrClient.ServerIterator serverIterator = new LBSolrClient.ServerIterator(req, zombieServers);
+ zombieServers.put("2", new LBSolrClient.ServerWrapper("2"));
+
+ assertTrue(serverIterator.hasNext());
+ assertEquals("1", serverIterator.nextOrError());
+ assertTrue(serverIterator.hasNext());
+ assertEquals("3", serverIterator.nextOrError());
+ assertTrue(serverIterator.hasNext());
+ assertEquals("4", serverIterator.nextOrError());
+ assertTrue(serverIterator.hasNext());
+ assertEquals("2", serverIterator.nextOrError());
+ }
+
+ @Test
+ public void testServerIteratorTimeAllowed() throws SolrServerException, InterruptedException {
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.set(CommonParams.TIME_ALLOWED, 300);
+ LBSolrClient.Req req = new LBSolrClient.Req(new QueryRequest(params), Arrays.asList("1", "2", "3", "4"), 2);
+ LBSolrClient.ServerIterator serverIterator = new LBSolrClient.ServerIterator(req, new HashMap<>());
+ assertTrue(serverIterator.hasNext());
+ serverIterator.nextOrError();
+ Thread.sleep(300);
+ LuceneTestCase.expectThrows(SolrServerException.class, serverIterator::nextOrError);
+ }
+
+ @Test
+ public void testServerIteratorMaxRetry() throws SolrServerException {
+ LBSolrClient.Req req = new LBSolrClient.Req(new QueryRequest(), Arrays.asList("1", "2", "3", "4"), 2);
+ LBSolrClient.ServerIterator serverIterator = new LBSolrClient.ServerIterator(req, new HashMap<>());
+ assertTrue(serverIterator.hasNext());
+ serverIterator.nextOrError();
+ assertTrue(serverIterator.hasNext());
+ serverIterator.nextOrError();
+ LuceneTestCase.expectThrows(SolrServerException.class, serverIterator::nextOrError);
+ }
+}