You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by da...@apache.org on 2018/09/25 02:11:01 UTC
lucene-solr:jira/http2: Refactoring LBHttp2SolrClient and
LBHttpSolrClient to remove duplicate code
Repository: lucene-solr
Updated Branches:
refs/heads/jira/http2 661774131 -> 65fb03591
Refactoring LBHttp2SolrClient and LBHttpSolrClient to remove duplicate code
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/65fb0359
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/65fb0359
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/65fb0359
Branch: refs/heads/jira/http2
Commit: 65fb03591fe6779683d4b559196809231d5fa3ef
Parents: 6617741
Author: Cao Manh Dat <da...@apache.org>
Authored: Tue Sep 25 09:10:46 2018 +0700
Committer: Cao Manh Dat <da...@apache.org>
Committed: Tue Sep 25 09:10:46 2018 +0700
----------------------------------------------------------------------
.../handler/component/HttpShardHandler.java | 4 +-
.../component/HttpShardHandlerFactory.java | 11 +-
.../component/TestHttpShardHandlerFactory.java | 4 +-
.../client/solrj/impl/LBHttp2SolrClient.java | 630 +----------------
.../client/solrj/impl/LBHttpSolrClient.java | 685 ++----------------
.../solr/client/solrj/impl/LBSolrClient.java | 707 +++++++++++++++++++
6 files changed, 763 insertions(+), 1278 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/65fb0359/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java
index 6f60ffc..12b47d7 100644
--- a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java
@@ -35,7 +35,7 @@ import java.util.function.Predicate;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.client.solrj.impl.Http2SolrClient;
-import org.apache.solr.client.solrj.impl.LBHttpSolrClient;
+import org.apache.solr.client.solrj.impl.LBSolrClient;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.util.ClientUtils;
import org.apache.solr.cloud.CloudDescriptor;
@@ -169,7 +169,7 @@ public class HttpShardHandler extends ShardHandler {
req.setBasePath(url);
ssr.nl = httpClient.request(req);
} else {
- LBHttpSolrClient.Rsp rsp = httpShardHandlerFactory.makeLoadBalancedRequest(req, urls);
+ LBSolrClient.Rsp rsp = httpShardHandlerFactory.makeLoadBalancedRequest(req, urls);
ssr.nl = rsp.getResponse();
srsp.setShardAddress(rsp.getServer());
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/65fb0359/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java
index a093517..f924000 100644
--- a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java
+++ b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java
@@ -36,14 +36,13 @@ import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.Http2SolrClient;
import org.apache.solr.client.solrj.impl.HttpClientUtil;
import org.apache.solr.client.solrj.impl.LBHttp2SolrClient;
-import org.apache.solr.client.solrj.impl.LBHttpSolrClient;
+import org.apache.solr.client.solrj.impl.LBSolrClient;
import org.apache.solr.client.solrj.impl.SolrHttpClientBuilder;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.params.CommonParams;
-import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.ShardParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.ExecutorUtil;
@@ -257,17 +256,17 @@ public class HttpShardHandlerFactory extends ShardHandlerFactory implements org.
* @param urls The list of solr server urls to load balance across
* @return The response from the request
*/
- public LBHttpSolrClient.Rsp makeLoadBalancedRequest(final QueryRequest req, List<String> urls)
+ public LBSolrClient.Rsp makeLoadBalancedRequest(final QueryRequest req, List<String> urls)
throws SolrServerException, IOException {
return loadbalancer.request(newLBHttpSolrClientReq(req, urls));
}
- protected LBHttpSolrClient.Req newLBHttpSolrClientReq(final QueryRequest req, List<String> urls) {
+ protected LBSolrClient.Req newLBHttpSolrClientReq(final QueryRequest req, List<String> urls) {
int numServersToTry = (int)Math.floor(urls.size() * this.permittedLoadBalancerRequestsMaximumFraction);
if (numServersToTry < this.permittedLoadBalancerRequestsMinimumAbsolute) {
numServersToTry = this.permittedLoadBalancerRequestsMinimumAbsolute;
}
- return new LBHttpSolrClient.Req(req, urls, numServersToTry);
+ return new LBSolrClient.Req(req, urls, numServersToTry);
}
/**
@@ -288,7 +287,7 @@ public class HttpShardHandlerFactory extends ShardHandlerFactory implements org.
}
/**
- * A distributed request is made via {@link LBHttpSolrClient} to the first live server in the URL list.
+ * A distributed request is made via {@link LBSolrClient} to the first live server in the URL list.
* This means it is just as likely to choose current host as any of the other hosts.
* This function makes sure that the cores are sorted according to the given list of preferences.
* E.g. If all nodes prefer local cores then a bad/heavily-loaded node will receive less requests from
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/65fb0359/solr/core/src/test/org/apache/solr/handler/component/TestHttpShardHandlerFactory.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/handler/component/TestHttpShardHandlerFactory.java b/solr/core/src/test/org/apache/solr/handler/component/TestHttpShardHandlerFactory.java
index 01aff97..34fc69f 100644
--- a/solr/core/src/test/org/apache/solr/handler/component/TestHttpShardHandlerFactory.java
+++ b/solr/core/src/test/org/apache/solr/handler/component/TestHttpShardHandlerFactory.java
@@ -22,8 +22,8 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.client.solrj.impl.LBSolrClient;
import org.apache.solr.client.solrj.request.QueryRequest;
-import org.apache.solr.client.solrj.impl.LBHttpSolrClient;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.ShardParams;
@@ -80,7 +80,7 @@ public class TestHttpShardHandlerFactory extends SolrTestCaseJ4 {
}
// create LBHttpSolrClient request
- final LBHttpSolrClient.Req req = httpShardHandlerFactory.newLBHttpSolrClientReq(queryRequest, urls);
+ final LBSolrClient.Req req = httpShardHandlerFactory.newLBHttpSolrClientReq(queryRequest, urls);
// actual vs. expected test
final int actualNumServersToTry = req.getNumServersToTry().intValue();
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/65fb0359/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttp2SolrClient.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttp2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttp2SolrClient.java
index d34dfdb..4e84f65 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttp2SolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttp2SolrClient.java
@@ -16,46 +16,9 @@
*/
package org.apache.solr.client.solrj.impl;
-import java.io.IOException;
-import java.lang.ref.WeakReference;
-import java.net.ConnectException;
-import java.net.MalformedURLException;
-import java.net.SocketException;
-import java.net.SocketTimeoutException;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.Arrays;
-import org.apache.solr.client.solrj.ResponseParser;
import org.apache.solr.client.solrj.SolrClient;
-import org.apache.solr.client.solrj.SolrQuery;
-import org.apache.solr.client.solrj.SolrRequest;
-import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.impl.HttpSolrClient.RemoteExecutionException;
-import org.apache.solr.client.solrj.impl.LBHttpSolrClient.Req;
-import org.apache.solr.client.solrj.impl.LBHttpSolrClient.Rsp;
-import org.apache.solr.client.solrj.request.IsUpdateRequest;
-import org.apache.solr.client.solrj.request.QueryRequest;
-import org.apache.solr.client.solrj.request.RequestWriter;
-import org.apache.solr.client.solrj.response.QueryResponse;
-import org.apache.solr.common.SolrException;
-import org.apache.solr.common.params.CommonParams;
-import org.apache.solr.common.params.SolrParams;
-import org.apache.solr.common.util.NamedList;
-import org.apache.solr.common.util.SolrjNamedThreadFactory;
-import org.slf4j.MDC;
-
-import static org.apache.solr.common.params.CommonParams.ADMIN_PATHS;
/**
* LBHttp2SolrClient or "LoadBalanced LBHttp2SolrClient" is a load balancing wrapper around
@@ -91,596 +54,15 @@ import static org.apache.solr.common.params.CommonParams.ADMIN_PATHS;
*
* @since solr 8.0
*/
-public class LBHttp2SolrClient extends SolrClient {
- private static Set<Integer> RETRY_CODES = new HashSet<>(4);
-
- static {
- RETRY_CODES.add(404);
- RETRY_CODES.add(403);
- RETRY_CODES.add(503);
- RETRY_CODES.add(500);
- }
-
- // keys to the maps are currently of the form "http://localhost:8983/solr"
- private final Map<String, ServerWrapper> aliveServers = new LinkedHashMap<>();
- // access to aliveServers should be synchronized on itself
-
- private final Map<String, ServerWrapper> zombieServers = new ConcurrentHashMap<>();
-
- // changes to aliveServers are reflected in this array, no need to synchronize
- private volatile ServerWrapper[] aliveServerList = new ServerWrapper[0];
-
-
- private ScheduledExecutorService aliveCheckExecutor;
-
- private final Http2SolrClient httpClient;
- private final AtomicInteger counter = new AtomicInteger(-1);
-
- private static final SolrQuery solrQuery = new SolrQuery("*:*");
- private volatile ResponseParser parser;
- private volatile RequestWriter requestWriter;
-
- private Set<String> queryParams = new HashSet<>();
-
- static {
- solrQuery.setRows(0);
- /**
- * Default sort (if we don't supply a sort) is by score and since
- * we request 0 rows any sorting and scoring is not necessary.
- * SolrQuery.DOCID schema-independently specifies a non-scoring sort.
- * <code>_docid_ asc</code> sort is efficient,
- * <code>_docid_ desc</code> sort is not, so choose ascending DOCID sort.
- */
- solrQuery.setSort(SolrQuery.DOCID, SolrQuery.ORDER.asc);
- // not a top-level request, we are interested only in the server being sent to i.e. it need not distribute our request to further servers
- solrQuery.setDistrib(false);
- }
-
- protected static class ServerWrapper {
-
- final String baseUrl;
-
- // "standard" servers are used by default. They normally live in the alive list
- // and move to the zombie list when unavailable. When they become available again,
- // they move back to the alive list.
- boolean standard = true;
-
- int failedPings = 0;
-
- ServerWrapper(String baseUrl) {
- this.baseUrl = baseUrl;
- }
-
- @Override
- public String toString() {
- return baseUrl;
- }
-
- @Override
- public int hashCode() {
- return baseUrl.hashCode();
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) return true;
- if (!(obj instanceof ServerWrapper)) return false;
- return baseUrl.equals(((ServerWrapper)obj).baseUrl);
- }
- }
+public class LBHttp2SolrClient extends LBSolrClient {
+ private Http2SolrClient httpClient;
public LBHttp2SolrClient(Http2SolrClient httpClient, String... baseSolrUrls) {
+ super(Arrays.asList(baseSolrUrls));
this.httpClient = httpClient;
- if (baseSolrUrls != null) {
- for (String s : baseSolrUrls) {
- ServerWrapper wrapper = new ServerWrapper(s);
- aliveServers.put(s, wrapper);
- }
- }
- updateAliveList();
- }
-
- public Set<String> getQueryParams() {
- return queryParams;
- }
-
- /**
- * Expert Method.
- * @param queryParams set of param keys to only send via the query string
- */
- public void setQueryParams(Set<String> queryParams) {
- this.queryParams = queryParams;
- }
- public void addQueryParams(String queryOnlyParam) {
- this.queryParams.add(queryOnlyParam) ;
- }
-
- public static String normalize(String server) {
- if (server.endsWith("/"))
- server = server.substring(0, server.length() - 1);
- return server;
- }
-
- /**
- * Tries to query a live server from the list provided in Req. Servers in the dead pool are skipped.
- * If a request fails due to an IOException, the server is moved to the dead pool for a certain period of
- * time, or until a test request on that server succeeds.
- *
- * Servers are queried in the exact order given (except servers currently in the dead pool are skipped).
- * If no live servers from the provided list remain to be tried, a number of previously skipped dead servers will be tried.
- * Req.getNumDeadServersToTry() controls how many dead servers will be tried.
- *
- * If no live servers are found a SolrServerException is thrown.
- *
- * @param req contains both the request as well as the list of servers to query
- *
- * @return the result of the request
- *
- * @throws IOException If there is a low-level I/O error.
- */
- public Rsp request(Req req) throws SolrServerException, IOException {
- Rsp rsp = new Rsp();
- Exception ex = null;
- boolean isNonRetryable = req.request instanceof IsUpdateRequest || ADMIN_PATHS.contains(req.request.getPath());
- List<ServerWrapper> skipped = null;
-
- final Integer numServersToTry = req.getNumServersToTry();
- int numServersTried = 0;
-
- boolean timeAllowedExceeded = false;
- long timeAllowedNano = getTimeAllowedInNanos(req.getRequest());
- long timeOutTime = System.nanoTime() + timeAllowedNano;
- for (String serverStr : req.getServers()) {
- if (timeAllowedExceeded = isTimeExceeded(timeAllowedNano, timeOutTime)) {
- break;
- }
-
- serverStr = normalize(serverStr);
- // if the server is currently a zombie, just skip to the next one
- ServerWrapper wrapper = zombieServers.get(serverStr);
- if (wrapper != null) {
- // System.out.println("ZOMBIE SERVER QUERIED: " + serverStr);
- final int numDeadServersToTry = req.getNumDeadServersToTry();
- if (numDeadServersToTry > 0) {
- if (skipped == null) {
- skipped = new ArrayList<>(numDeadServersToTry);
- skipped.add(wrapper);
- }
- else if (skipped.size() < numDeadServersToTry) {
- skipped.add(wrapper);
- }
- }
- continue;
- }
- try {
- MDC.put("LBHttp2SolrClient.url", serverStr);
-
- if (numServersToTry != null && numServersTried > numServersToTry) {
- break;
- }
-
- ++numServersTried;
- ex = doRequest(serverStr, req, rsp, isNonRetryable, false);
- if (ex == null) {
- return rsp; // SUCCESS
- }
- } finally {
- MDC.remove("LBHttp2SolrClient.url");
- }
- }
-
- // try the servers we previously skipped
- if (skipped != null) {
- for (ServerWrapper wrapper : skipped) {
- if (timeAllowedExceeded = isTimeExceeded(timeAllowedNano, timeOutTime)) {
- break;
- }
-
- if (numServersToTry != null && numServersTried > numServersToTry) {
- break;
- }
-
- try {
- MDC.put("LBHttp2SolrClient.url", wrapper.baseUrl);
- ++numServersTried;
- ex = doRequest(wrapper.baseUrl, req, rsp, isNonRetryable, true);
- if (ex == null) {
- return rsp; // SUCCESS
- }
- } finally {
- MDC.remove("LBHttp2SolrClient.url");
- }
- }
- }
-
-
- final String solrServerExceptionMessage;
- if (timeAllowedExceeded) {
- solrServerExceptionMessage = "Time allowed to handle this request exceeded";
- } else {
- if (numServersToTry != null && numServersTried > numServersToTry) {
- solrServerExceptionMessage = "No live SolrServers available to handle this request:"
- + " numServersTried="+numServersTried
- + " numServersToTry="+ numServersToTry;
- } else {
- solrServerExceptionMessage = "No live SolrServers available to handle this request";
- }
- }
- if (ex == null) {
- throw new SolrServerException(solrServerExceptionMessage);
- } else {
- throw new SolrServerException(solrServerExceptionMessage+":" + zombieServers.keySet(), ex);
- }
-
- }
-
- private Exception addZombie(String serverStr, Exception e) {
-
- ServerWrapper wrapper;
-
- wrapper = new ServerWrapper(serverStr);
- wrapper.standard = false;
- zombieServers.put(serverStr, wrapper);
- startAliveCheckExecutor();
- return e;
- }
-
- private Exception doRequest(String serverStr, Req req, Rsp rsp, boolean isNonRetryable,
- boolean isZombie) throws SolrServerException, IOException {
- Exception ex = null;
- try {
- rsp.server = serverStr;
- req.getRequest().setBasePath(serverStr);
- rsp.rsp = httpClient.request(req.getRequest(), null);
- if (isZombie) {
- zombieServers.remove(serverStr);
- }
- } catch (RemoteExecutionException e){
- throw e;
- } catch(SolrException e) {
- // we retry on 404 or 403 or 503 or 500
- // unless it's an update - then we only retry on connect exception
- if (!isNonRetryable && RETRY_CODES.contains(e.code())) {
- ex = (!isZombie) ? addZombie(serverStr, e) : e;
- } else {
- // Server is alive but the request was likely malformed or invalid
- if (isZombie) {
- zombieServers.remove(serverStr);
- }
- throw e;
- }
- } catch (SocketException e) {
- if (!isNonRetryable || e instanceof ConnectException) {
- ex = (!isZombie) ? addZombie(serverStr, e) : e;
- } else {
- throw e;
- }
- } catch (SocketTimeoutException e) {
- if (!isNonRetryable) {
- ex = (!isZombie) ? addZombie(serverStr, e) : e;
- } else {
- throw e;
- }
- } catch (SolrServerException e) {
- Throwable rootCause = e.getRootCause();
- if (!isNonRetryable && rootCause instanceof IOException) {
- ex = (!isZombie) ? addZombie(serverStr, e) : e;
- } else if (isNonRetryable && rootCause instanceof ConnectException) {
- ex = (!isZombie) ? addZombie(serverStr, e) : e;
- } else {
- throw e;
- }
- } catch (Exception e) {
- throw new SolrServerException(e);
- }
-
- return ex;
- }
-
- private void updateAliveList() {
- synchronized (aliveServers) {
- aliveServerList = aliveServers.values().toArray(new ServerWrapper[0]);
- }
}
-
- private ServerWrapper removeFromAlive(String key) {
- synchronized (aliveServers) {
- ServerWrapper wrapper = aliveServers.remove(key);
- if (wrapper != null)
- updateAliveList();
- return wrapper;
- }
- }
-
- private void addToAlive(ServerWrapper wrapper) {
- synchronized (aliveServers) {
- ServerWrapper prev = aliveServers.put(wrapper.baseUrl, wrapper);
- // TODO: warn if there was a previous entry?
- updateAliveList();
- }
- }
-
- public void addSolrServer(String server) throws MalformedURLException {
- addToAlive(new ServerWrapper(server));
- }
-
- public String removeSolrServer(String server) {
- try {
- server = new URL(server).toExternalForm();
- } catch (MalformedURLException e) {
- throw new RuntimeException(e);
- }
- if (server.endsWith("/")) {
- server = server.substring(0, server.length() - 1);
- }
-
- // there is a small race condition here - if the server is in the process of being moved between
- // lists, we could fail to remove it.
- removeFromAlive(server);
- zombieServers.remove(server);
- return null;
- }
-
-
- @Override
- public void close() {
- if (aliveCheckExecutor != null) {
- aliveCheckExecutor.shutdownNow();
- }
- }
-
- /**
- * Tries to query a live server. A SolrServerException is thrown if all servers are dead.
- * If the request failed due to IOException then the live server is moved to dead pool and the request is
- * retried on another live server. After live servers are exhausted, any servers previously marked as dead
- * will be tried before failing the request.
- *
- * @param request the SolrRequest.
- *
- * @return response
- *
- * @throws IOException If there is a low-level I/O error.
- */
- @Override
- public NamedList<Object> request(final SolrRequest request, String collection)
- throws SolrServerException, IOException {
- return request(request, collection, null);
- }
-
- public NamedList<Object> request(final SolrRequest request, String collection,
- final Integer numServersToTry) throws SolrServerException, IOException {
- Exception ex = null;
- ServerWrapper[] serverList = aliveServerList;
-
- final int maxTries = (numServersToTry == null ? serverList.length : numServersToTry);
- int numServersTried = 0;
- Map<String,ServerWrapper> justFailed = null;
-
- boolean timeAllowedExceeded = false;
- long timeAllowedNano = getTimeAllowedInNanos(request);
- long timeOutTime = System.nanoTime() + timeAllowedNano;
- for (int attempts=0; attempts<maxTries; attempts++) {
- if (timeAllowedExceeded = isTimeExceeded(timeAllowedNano, timeOutTime)) {
- break;
- }
-
- int count = counter.incrementAndGet() & Integer.MAX_VALUE;
- ServerWrapper wrapper = serverList[count % serverList.length];
-
- try {
- ++numServersTried;
- request.setBasePath(wrapper.baseUrl);
- return httpClient.request(request, collection);
- } catch (SolrException e) {
- // Server is alive but the request was malformed or invalid
- throw e;
- } catch (SolrServerException e) {
- if (e.getRootCause() instanceof IOException) {
- ex = e;
- moveAliveToDead(wrapper);
- if (justFailed == null) justFailed = new HashMap<>();
- justFailed.put(wrapper.baseUrl, wrapper);
- } else {
- throw e;
- }
- } catch (Exception e) {
- throw new SolrServerException(e);
- }
- }
-
- // try other standard servers that we didn't try just now
- for (ServerWrapper wrapper : zombieServers.values()) {
- if (timeAllowedExceeded = isTimeExceeded(timeAllowedNano, timeOutTime)) {
- break;
- }
-
- if (!wrapper.standard || justFailed!=null && justFailed.containsKey(wrapper.baseUrl)) continue;
- try {
- ++numServersTried;
- request.setBasePath(wrapper.baseUrl);
- NamedList<Object> rsp = httpClient.request(request, collection);
- // remove from zombie list *before* adding to alive to avoid a race that could lose a server
- zombieServers.remove(wrapper.baseUrl);
- addToAlive(wrapper);
- return rsp;
- } catch (SolrException e) {
- // Server is alive but the request was malformed or invalid
- throw e;
- } catch (SolrServerException e) {
- if (e.getRootCause() instanceof IOException) {
- ex = e;
- // still dead
- } else {
- throw e;
- }
- } catch (Exception e) {
- throw new SolrServerException(e);
- }
- }
-
-
- final String solrServerExceptionMessage;
- if (timeAllowedExceeded) {
- solrServerExceptionMessage = "Time allowed to handle this request exceeded";
- } else {
- if (numServersToTry != null && numServersTried > numServersToTry) {
- solrServerExceptionMessage = "No live SolrServers available to handle this request:"
- + " numServersTried="+numServersTried
- + " numServersToTry="+ numServersToTry;
- } else {
- solrServerExceptionMessage = "No live SolrServers available to handle this request";
- }
- }
- if (ex == null) {
- throw new SolrServerException(solrServerExceptionMessage);
- } else {
- throw new SolrServerException(solrServerExceptionMessage, ex);
- }
- }
-
- /**
- * @return time allowed in nanos, returns -1 if no time_allowed is specified.
- */
- private long getTimeAllowedInNanos(final SolrRequest req) {
- SolrParams reqParams = req.getParams();
- return reqParams == null ? -1 :
- TimeUnit.NANOSECONDS.convert(reqParams.getInt(CommonParams.TIME_ALLOWED, -1), TimeUnit.MILLISECONDS);
- }
-
- private boolean isTimeExceeded(long timeAllowedNano, long timeOutTime) {
- return timeAllowedNano > 0 && System.nanoTime() > timeOutTime;
- }
-
- /**
- * Takes up one dead server and check for aliveness. The check is done in a roundrobin. Each server is checked for
- * aliveness once in 'x' millis where x is decided by the setAliveCheckinterval() or it is defaulted to 1 minute
- *
- * @param zombieServer a server in the dead pool
- */
- private void checkAZombieServer(ServerWrapper zombieServer) {
- try {
- QueryRequest queryRequest = new QueryRequest(solrQuery);
- queryRequest.setBasePath(zombieServer.baseUrl);
- QueryResponse resp = queryRequest.process(httpClient);
- if (resp.getStatus() == 0) {
- // server has come back up.
- // make sure to remove from zombies before adding to alive to avoid a race condition
- // where another thread could mark it down, move it back to zombie, and then we delete
- // from zombie and lose it forever.
- ServerWrapper wrapper = zombieServers.remove(zombieServer.baseUrl);
- if (wrapper != null) {
- wrapper.failedPings = 0;
- if (wrapper.standard) {
- addToAlive(wrapper);
- }
- } else {
- // something else already moved the server from zombie to alive
- }
- }
- } catch (Exception e) {
- //Expected. The server is still down.
- zombieServer.failedPings++;
-
- // If the server doesn't belong in the standard set belonging to this load balancer
- // then simply drop it after a certain number of failed pings.
- if (!zombieServer.standard && zombieServer.failedPings >= NONSTANDARD_PING_LIMIT) {
- zombieServers.remove(zombieServer.baseUrl);
- }
- }
- }
-
- private void moveAliveToDead(ServerWrapper wrapper) {
- wrapper = removeFromAlive(wrapper.baseUrl);
- if (wrapper == null)
- return; // another thread already detected the failure and removed it
- zombieServers.put(wrapper.baseUrl, wrapper);
- startAliveCheckExecutor();
- }
-
- private int interval = CHECK_INTERVAL;
-
- /**
- * LBHttp2SolrClient keeps pinging the dead servers at fixed interval to find if it is alive. Use this to set that
- * interval
- *
- * @param interval time in milliseconds
- */
- public void setAliveCheckInterval(int interval) {
- if (interval <= 0) {
- throw new IllegalArgumentException("Alive check interval must be " +
- "positive, specified value = " + interval);
- }
- this.interval = interval;
- }
-
- private void startAliveCheckExecutor() {
- // double-checked locking, but it's OK because we don't *do* anything with aliveCheckExecutor
- // if it's not null.
- if (aliveCheckExecutor == null) {
- synchronized (this) {
- if (aliveCheckExecutor == null) {
- aliveCheckExecutor = Executors.newSingleThreadScheduledExecutor(
- new SolrjNamedThreadFactory("aliveCheckExecutor"));
- aliveCheckExecutor.scheduleAtFixedRate(
- getAliveCheckRunner(new WeakReference<>(this)),
- this.interval, this.interval, TimeUnit.MILLISECONDS);
- }
- }
- }
- }
-
- private static Runnable getAliveCheckRunner(final WeakReference<LBHttp2SolrClient> lbRef) {
- return () -> {
- LBHttp2SolrClient lb = lbRef.get();
- if (lb != null) {
- for (ServerWrapper zombieServer : lb.zombieServers.values()) {
- lb.checkAZombieServer(zombieServer);
- }
- }
- };
- }
-
- public ResponseParser getParser() {
- return parser;
- }
-
- /**
- * Changes the {@link ResponseParser} that will be used for the internal
- * SolrServer objects.
- *
- * @param parser Default Response Parser chosen to parse the response if the parser
- * were not specified as part of the request.
- * @see SolrRequest#getResponseParser()
- */
- public void setParser(ResponseParser parser) {
- this.parser = parser;
- }
-
- /**
- * Changes the {@link RequestWriter} that will be used for the internal
- * SolrServer objects.
- *
- * @param requestWriter Default RequestWriter, used to encode requests sent to the server.
- */
- public void setRequestWriter(RequestWriter requestWriter) {
- this.requestWriter = requestWriter;
- }
-
- public RequestWriter getRequestWriter() {
- return requestWriter;
- }
-
@Override
- protected void finalize() throws Throwable {
- try {
- if(this.aliveCheckExecutor!=null)
- this.aliveCheckExecutor.shutdownNow();
- } finally {
- super.finalize();
- }
+ protected SolrClient getClient(String baseUrl) {
+ return httpClient;
}
-
- // defaults
- private static final int CHECK_INTERVAL = 60 * 1000; //1 minute between checks
- private static final int NONSTANDARD_PING_LIMIT = 5; // number of times we'll ping dead servers not in the server list
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/65fb0359/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttpSolrClient.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttpSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttpSolrClient.java
index 6c2737d..af4f812 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttpSolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBHttpSolrClient.java
@@ -94,147 +94,36 @@ import static org.apache.solr.common.params.CommonParams.ADMIN_PATHS;
*
* @since solr 1.4
*/
-public class LBHttpSolrClient extends SolrClient {
- private static Set<Integer> RETRY_CODES = new HashSet<>(4);
-
- static {
- RETRY_CODES.add(404);
- RETRY_CODES.add(403);
- RETRY_CODES.add(503);
- RETRY_CODES.add(500);
- }
-
- // keys to the maps are currently of the form "http://localhost:8983/solr"
- // which should be equivalent to HttpSolrServer.getBaseURL()
- private final Map<String, ServerWrapper> aliveServers = new LinkedHashMap<>();
- // access to aliveServers should be synchronized on itself
-
- protected final Map<String, ServerWrapper> zombieServers = new ConcurrentHashMap<>();
-
- // changes to aliveServers are reflected in this array, no need to synchronize
- private volatile ServerWrapper[] aliveServerList = new ServerWrapper[0];
-
-
- private ScheduledExecutorService aliveCheckExecutor;
+public class LBHttpSolrClient extends LBSolrClient {
private final HttpClient httpClient;
private final boolean clientIsInternal;
+ private final ConcurrentHashMap<String, HttpSolrClient> urlToClient = new ConcurrentHashMap<>();
private HttpSolrClient.Builder httpSolrClientBuilder;
- private final AtomicInteger counter = new AtomicInteger(-1);
-
- private static final SolrQuery solrQuery = new SolrQuery("*:*");
- private volatile ResponseParser parser;
- private volatile RequestWriter requestWriter;
- private Set<String> queryParams = new HashSet<>();
private Integer connectionTimeout;
-
private Integer soTimeout;
- static {
- solrQuery.setRows(0);
- /**
- * Default sort (if we don't supply a sort) is by score and since
- * we request 0 rows any sorting and scoring is not necessary.
- * SolrQuery.DOCID schema-independently specifies a non-scoring sort.
- * <code>_docid_ asc</code> sort is efficient,
- * <code>_docid_ desc</code> sort is not, so choose ascending DOCID sort.
- */
- solrQuery.setSort(SolrQuery.DOCID, SolrQuery.ORDER.asc);
- // not a top-level request, we are interested only in the server being sent to i.e. it need not distribute our request to further servers
- solrQuery.setDistrib(false);
- }
-
- protected static class ServerWrapper {
-
- final HttpSolrClient client;
-
- // "standard" servers are used by default. They normally live in the alive list
- // and move to the zombie list when unavailable. When they become available again,
- // they move back to the alive list.
- boolean standard = true;
-
- int failedPings = 0;
-
- public ServerWrapper(HttpSolrClient client) {
- this.client = client;
- }
-
- @Override
- public String toString() {
- return client.getBaseURL();
- }
-
- public String getKey() {
- return client.getBaseURL();
- }
-
- @Override
- public int hashCode() {
- return this.getKey().hashCode();
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) return true;
- if (!(obj instanceof ServerWrapper)) return false;
- return this.getKey().equals(((ServerWrapper)obj).getKey());
- }
- }
-
- public static class Req {
- protected SolrRequest request;
- protected List<String> servers;
- protected int numDeadServersToTry;
- private final Integer numServersToTry;
-
+ /**
+ * @deprecated use {@link LBSolrClient.Req} instead
+ */
+ @Deprecated
+ public static class Req extends LBSolrClient.Req {
public Req(SolrRequest request, List<String> servers) {
- this(request, servers, null);
+ super(request, servers);
}
public Req(SolrRequest request, List<String> servers, Integer numServersToTry) {
- this.request = request;
- this.servers = servers;
- this.numDeadServersToTry = servers.size();
- this.numServersToTry = numServersToTry;
- }
-
- public SolrRequest getRequest() {
- return request;
- }
- public List<String> getServers() {
- return servers;
- }
-
- /** @return the number of dead servers to try if there are no live servers left */
- public int getNumDeadServersToTry() {
- return numDeadServersToTry;
- }
-
- /** @param numDeadServersToTry The number of dead servers to try if there are no live servers left.
- * Defaults to the number of servers in this request. */
- public void setNumDeadServersToTry(int numDeadServersToTry) {
- this.numDeadServersToTry = numDeadServersToTry;
- }
-
- public Integer getNumServersToTry() {
- return numServersToTry;
+ super(request, servers, numServersToTry);
}
}
- public static class Rsp {
- protected String server;
- protected NamedList<Object> rsp;
-
- /** The response from the server */
- public NamedList<Object> getResponse() {
- return rsp;
- }
+ /**
+ * @deprecated use {@link LBSolrClient.Rsp} instead
+ */
+ @Deprecated
+ public static class Rsp extends LBSolrClient.Rsp {
- /** The server that returned the response */
- public String getServer() {
- return server;
- }
}
/**
@@ -265,20 +154,13 @@ public class LBHttpSolrClient extends SolrClient {
}
protected LBHttpSolrClient(Builder builder) {
+ super(builder.baseSolrUrls);
this.clientIsInternal = builder.httpClient == null;
this.httpSolrClientBuilder = builder.httpSolrClientBuilder;
this.httpClient = builder.httpClient == null ? constructClient(builder.baseSolrUrls.toArray(new String[builder.baseSolrUrls.size()])) : builder.httpClient;
this.connectionTimeout = builder.connectionTimeoutMillis;
this.soTimeout = builder.socketTimeoutMillis;
this.parser = builder.responseParser;
-
- if (! builder.baseSolrUrls.isEmpty()) {
- for (String s : builder.baseSolrUrls) {
- ServerWrapper wrapper = new ServerWrapper(makeSolrClient(s));
- aliveServers.put(wrapper.getKey(), wrapper);
- }
- }
- updateAliveList();
}
private HttpClient constructClient(String[] solrServerUrl) {
@@ -292,27 +174,6 @@ public class LBHttpSolrClient extends SolrClient {
return HttpClientUtil.createClient(params);
}
- public Set<String> getQueryParams() {
- return queryParams;
- }
-
- /**
- * Expert Method.
- * @param queryParams set of param keys to only send via the query string
- */
- public void setQueryParams(Set<String> queryParams) {
- this.queryParams = queryParams;
- }
- public void addQueryParams(String queryOnlyParam) {
- this.queryParams.add(queryOnlyParam) ;
- }
-
- public static String normalize(String server) {
- if (server.endsWith("/"))
- server = server.substring(0, server.length() - 1);
- return server;
- }
-
protected HttpSolrClient makeSolrClient(String server) {
HttpSolrClient client;
if (httpSolrClientBuilder != null) {
@@ -350,243 +211,12 @@ public class LBHttpSolrClient extends SolrClient {
}
/**
- * Tries to query a live server from the list provided in Req. Servers in the dead pool are skipped.
- * If a request fails due to an IOException, the server is moved to the dead pool for a certain period of
- * time, or until a test request on that server succeeds.
- *
- * Servers are queried in the exact order given (except servers currently in the dead pool are skipped).
- * If no live servers from the provided list remain to be tried, a number of previously skipped dead servers will be tried.
- * Req.getNumDeadServersToTry() controls how many dead servers will be tried.
- *
- * If no live servers are found a SolrServerException is thrown.
- *
- * @param req contains both the request as well as the list of servers to query
- *
- * @return the result of the request
- *
- * @throws IOException If there is a low-level I/O error.
- */
- public Rsp request(Req req) throws SolrServerException, IOException {
- Rsp rsp = new Rsp();
- Exception ex = null;
- boolean isNonRetryable = req.request instanceof IsUpdateRequest || ADMIN_PATHS.contains(req.request.getPath());
- List<ServerWrapper> skipped = null;
-
- final Integer numServersToTry = req.getNumServersToTry();
- int numServersTried = 0;
-
- boolean timeAllowedExceeded = false;
- long timeAllowedNano = getTimeAllowedInNanos(req.getRequest());
- long timeOutTime = System.nanoTime() + timeAllowedNano;
- for (String serverStr : req.getServers()) {
- if (timeAllowedExceeded = isTimeExceeded(timeAllowedNano, timeOutTime)) {
- break;
- }
-
- serverStr = normalize(serverStr);
- // if the server is currently a zombie, just skip to the next one
- ServerWrapper wrapper = zombieServers.get(serverStr);
- if (wrapper != null) {
- // System.out.println("ZOMBIE SERVER QUERIED: " + serverStr);
- final int numDeadServersToTry = req.getNumDeadServersToTry();
- if (numDeadServersToTry > 0) {
- if (skipped == null) {
- skipped = new ArrayList<>(numDeadServersToTry);
- skipped.add(wrapper);
- }
- else if (skipped.size() < numDeadServersToTry) {
- skipped.add(wrapper);
- }
- }
- continue;
- }
- try {
- MDC.put("LBHttpSolrClient.url", serverStr);
-
- if (numServersToTry != null && numServersTried > numServersToTry.intValue()) {
- break;
- }
-
- HttpSolrClient client = makeSolrClient(serverStr);
-
- ++numServersTried;
- ex = doRequest(client, req, rsp, isNonRetryable, false, null);
- if (ex == null) {
- return rsp; // SUCCESS
- }
- } finally {
- MDC.remove("LBHttpSolrClient.url");
- }
- }
-
- // try the servers we previously skipped
- if (skipped != null) {
- for (ServerWrapper wrapper : skipped) {
- if (timeAllowedExceeded = isTimeExceeded(timeAllowedNano, timeOutTime)) {
- break;
- }
-
- if (numServersToTry != null && numServersTried > numServersToTry.intValue()) {
- break;
- }
-
- try {
- MDC.put("LBHttpSolrClient.url", wrapper.client.getBaseURL());
- ++numServersTried;
- ex = doRequest(wrapper.client, req, rsp, isNonRetryable, true, wrapper.getKey());
- if (ex == null) {
- return rsp; // SUCCESS
- }
- } finally {
- MDC.remove("LBHttpSolrClient.url");
- }
- }
- }
-
-
- final String solrServerExceptionMessage;
- if (timeAllowedExceeded) {
- solrServerExceptionMessage = "Time allowed to handle this request exceeded";
- } else {
- if (numServersToTry != null && numServersTried > numServersToTry.intValue()) {
- solrServerExceptionMessage = "No live SolrServers available to handle this request:"
- + " numServersTried="+numServersTried
- + " numServersToTry="+numServersToTry.intValue();
- } else {
- solrServerExceptionMessage = "No live SolrServers available to handle this request";
- }
- }
- if (ex == null) {
- throw new SolrServerException(solrServerExceptionMessage);
- } else {
- throw new SolrServerException(solrServerExceptionMessage+":" + zombieServers.keySet(), ex);
- }
-
- }
-
- protected Exception addZombie(HttpSolrClient server, Exception e) {
-
- ServerWrapper wrapper;
-
- wrapper = new ServerWrapper(server);
- wrapper.standard = false;
- zombieServers.put(wrapper.getKey(), wrapper);
- startAliveCheckExecutor();
- return e;
- }
-
- protected Exception doRequest(HttpSolrClient client, Req req, Rsp rsp, boolean isNonRetryable,
- boolean isZombie, String zombieKey) throws SolrServerException, IOException {
- Exception ex = null;
- try {
- rsp.server = client.getBaseURL();
- rsp.rsp = client.request(req.getRequest(), (String) null);
- if (isZombie) {
- zombieServers.remove(zombieKey);
- }
- } catch (RemoteExecutionException e){
- throw e;
- } catch(SolrException e) {
- // we retry on 404 or 403 or 503 or 500
- // unless it's an update - then we only retry on connect exception
- if (!isNonRetryable && RETRY_CODES.contains(e.code())) {
- ex = (!isZombie) ? addZombie(client, e) : e;
- } else {
- // Server is alive but the request was likely malformed or invalid
- if (isZombie) {
- zombieServers.remove(zombieKey);
- }
- throw e;
- }
- } catch (SocketException e) {
- if (!isNonRetryable || e instanceof ConnectException) {
- ex = (!isZombie) ? addZombie(client, e) : e;
- } else {
- throw e;
- }
- } catch (SocketTimeoutException e) {
- if (!isNonRetryable) {
- ex = (!isZombie) ? addZombie(client, e) : e;
- } else {
- throw e;
- }
- } catch (SolrServerException e) {
- Throwable rootCause = e.getRootCause();
- if (!isNonRetryable && rootCause instanceof IOException) {
- ex = (!isZombie) ? addZombie(client, e) : e;
- } else if (isNonRetryable && rootCause instanceof ConnectException) {
- ex = (!isZombie) ? addZombie(client, e) : e;
- } else {
- throw e;
- }
- } catch (Exception e) {
- throw new SolrServerException(e);
- }
-
- return ex;
- }
-
- private void updateAliveList() {
- synchronized (aliveServers) {
- aliveServerList = aliveServers.values().toArray(new ServerWrapper[aliveServers.size()]);
- }
- }
-
- private ServerWrapper removeFromAlive(String key) {
- synchronized (aliveServers) {
- ServerWrapper wrapper = aliveServers.remove(key);
- if (wrapper != null)
- updateAliveList();
- return wrapper;
- }
- }
-
- private void addToAlive(ServerWrapper wrapper) {
- synchronized (aliveServers) {
- ServerWrapper prev = aliveServers.put(wrapper.getKey(), wrapper);
- // TODO: warn if there was a previous entry?
- updateAliveList();
- }
- }
-
- public void addSolrServer(String server) throws MalformedURLException {
- HttpSolrClient client = makeSolrClient(server);
- addToAlive(new ServerWrapper(client));
- }
-
- public String removeSolrServer(String server) {
- try {
- server = new URL(server).toExternalForm();
- } catch (MalformedURLException e) {
- throw new RuntimeException(e);
- }
- if (server.endsWith("/")) {
- server = server.substring(0, server.length() - 1);
- }
-
- // there is a small race condition here - if the server is in the process of being moved between
- // lists, we could fail to remove it.
- removeFromAlive(server);
- zombieServers.remove(server);
- return null;
- }
-
- /**
* @deprecated since 7.0 Use {@link Builder} methods instead.
*/
@Deprecated
public void setConnectionTimeout(int timeout) {
this.connectionTimeout = timeout;
- synchronized (aliveServers) {
- Iterator<ServerWrapper> wrappersIt = aliveServers.values().iterator();
- while (wrappersIt.hasNext()) {
- wrappersIt.next().client.setConnectionTimeout(timeout);
- }
- }
- Iterator<ServerWrapper> wrappersIt = zombieServers.values().iterator();
- while (wrappersIt.hasNext()) {
- wrappersIt.next().client.setConnectionTimeout(timeout);
- }
+ this.urlToClient.values().forEach(client -> client.setConnectionTimeout(timeout));
}
/**
@@ -598,235 +228,46 @@ public class LBHttpSolrClient extends SolrClient {
@Deprecated
public void setSoTimeout(int timeout) {
this.soTimeout = timeout;
- synchronized (aliveServers) {
- Iterator<ServerWrapper> wrappersIt = aliveServers.values().iterator();
- while (wrappersIt.hasNext()) {
- wrappersIt.next().client.setSoTimeout(timeout);
- }
- }
- Iterator<ServerWrapper> wrappersIt = zombieServers.values().iterator();
- while (wrappersIt.hasNext()) {
- wrappersIt.next().client.setSoTimeout(timeout);
- }
- }
-
- @Override
- public void close() {
- if (aliveCheckExecutor != null) {
- aliveCheckExecutor.shutdownNow();
- }
- if(clientIsInternal) {
- HttpClientUtil.close(httpClient);
- }
+ this.urlToClient.values().forEach(client -> client.setSoTimeout(timeout));
}
/**
- * Tries to query a live server. A SolrServerException is thrown if all servers are dead.
- * If the request failed due to IOException then the live server is moved to dead pool and the request is
- * retried on another live server. After live servers are exhausted, any servers previously marked as dead
- * will be tried before failing the request.
- *
- * @param request the SolrRequest.
- *
- * @return response
- *
- * @throws IOException If there is a low-level I/O error.
+ * @deprecated use {@link LBSolrClient#request(LBSolrClient.Req)} instead
*/
- @Override
- public NamedList<Object> request(final SolrRequest request, String collection)
- throws SolrServerException, IOException {
- return request(request, collection, null);
- }
-
- public NamedList<Object> request(final SolrRequest request, String collection,
- final Integer numServersToTry) throws SolrServerException, IOException {
- Exception ex = null;
- ServerWrapper[] serverList = aliveServerList;
-
- final int maxTries = (numServersToTry == null ? serverList.length : numServersToTry.intValue());
- int numServersTried = 0;
- Map<String,ServerWrapper> justFailed = null;
-
- boolean timeAllowedExceeded = false;
- long timeAllowedNano = getTimeAllowedInNanos(request);
- long timeOutTime = System.nanoTime() + timeAllowedNano;
- for (int attempts=0; attempts<maxTries; attempts++) {
- if (timeAllowedExceeded = isTimeExceeded(timeAllowedNano, timeOutTime)) {
- break;
- }
-
- int count = counter.incrementAndGet() & Integer.MAX_VALUE;
- ServerWrapper wrapper = serverList[count % serverList.length];
-
- try {
- ++numServersTried;
- return wrapper.client.request(request, collection);
- } catch (SolrException e) {
- // Server is alive but the request was malformed or invalid
- throw e;
- } catch (SolrServerException e) {
- if (e.getRootCause() instanceof IOException) {
- ex = e;
- moveAliveToDead(wrapper);
- if (justFailed == null) justFailed = new HashMap<>();
- justFailed.put(wrapper.getKey(), wrapper);
- } else {
- throw e;
- }
- } catch (Exception e) {
- throw new SolrServerException(e);
- }
- }
-
- // try other standard servers that we didn't try just now
- for (ServerWrapper wrapper : zombieServers.values()) {
- if (timeAllowedExceeded = isTimeExceeded(timeAllowedNano, timeOutTime)) {
- break;
- }
-
- if (wrapper.standard==false || justFailed!=null && justFailed.containsKey(wrapper.getKey())) continue;
- try {
- ++numServersTried;
- NamedList<Object> rsp = wrapper.client.request(request, collection);
- // remove from zombie list *before* adding to alive to avoid a race that could lose a server
- zombieServers.remove(wrapper.getKey());
- addToAlive(wrapper);
- return rsp;
- } catch (SolrException e) {
- // Server is alive but the request was malformed or invalid
- throw e;
- } catch (SolrServerException e) {
- if (e.getRootCause() instanceof IOException) {
- ex = e;
- // still dead
- } else {
- throw e;
- }
- } catch (Exception e) {
- throw new SolrServerException(e);
- }
- }
-
-
- final String solrServerExceptionMessage;
- if (timeAllowedExceeded) {
- solrServerExceptionMessage = "Time allowed to handle this request exceeded";
- } else {
- if (numServersToTry != null && numServersTried > numServersToTry.intValue()) {
- solrServerExceptionMessage = "No live SolrServers available to handle this request:"
- + " numServersTried="+numServersTried
- + " numServersToTry="+numServersToTry.intValue();
- } else {
- solrServerExceptionMessage = "No live SolrServers available to handle this request";
- }
- }
- if (ex == null) {
- throw new SolrServerException(solrServerExceptionMessage);
- } else {
- throw new SolrServerException(solrServerExceptionMessage, ex);
- }
- }
-
- /**
- * @return time allowed in nanos, returns -1 if no time_allowed is specified.
- */
- private long getTimeAllowedInNanos(final SolrRequest req) {
- SolrParams reqParams = req.getParams();
- return reqParams == null ? -1 :
- TimeUnit.NANOSECONDS.convert(reqParams.getInt(CommonParams.TIME_ALLOWED, -1), TimeUnit.MILLISECONDS);
- }
-
- private boolean isTimeExceeded(long timeAllowedNano, long timeOutTime) {
- return timeAllowedNano > 0 && System.nanoTime() > timeOutTime;
+ @Deprecated
+ public Rsp request(Req req) throws SolrServerException, IOException {
+ LBSolrClient.Rsp rsp = super.request(req);
+ // for backward-compatibility support
+ Rsp result = new Rsp();
+ result.rsp = rsp.rsp;
+ result.server = rsp.server;
+ return result;
}
-
- /**
- * Takes up one dead server and check for aliveness. The check is done in a roundrobin. Each server is checked for
- * aliveness once in 'x' millis where x is decided by the setAliveCheckinterval() or it is defaulted to 1 minute
- *
- * @param zombieServer a server in the dead pool
- */
- private void checkAZombieServer(ServerWrapper zombieServer) {
- try {
- QueryResponse resp = zombieServer.client.query(solrQuery);
- if (resp.getStatus() == 0) {
- // server has come back up.
- // make sure to remove from zombies before adding to alive to avoid a race condition
- // where another thread could mark it down, move it back to zombie, and then we delete
- // from zombie and lose it forever.
- ServerWrapper wrapper = zombieServers.remove(zombieServer.getKey());
- if (wrapper != null) {
- wrapper.failedPings = 0;
- if (wrapper.standard) {
- addToAlive(wrapper);
- }
- } else {
- // something else already moved the server from zombie to alive
- }
- }
- } catch (Exception e) {
- //Expected. The server is still down.
- zombieServer.failedPings++;
- // If the server doesn't belong in the standard set belonging to this load balancer
- // then simply drop it after a certain number of failed pings.
- if (!zombieServer.standard && zombieServer.failedPings >= NONSTANDARD_PING_LIMIT) {
- zombieServers.remove(zombieServer.getKey());
- }
- }
+ @Override
+ protected SolrClient getClient(String baseUrl) {
+ return urlToClient.computeIfAbsent(baseUrl, this::makeSolrClient);
}
- private void moveAliveToDead(ServerWrapper wrapper) {
- wrapper = removeFromAlive(wrapper.getKey());
- if (wrapper == null)
- return; // another thread already detected the failure and removed it
- zombieServers.put(wrapper.getKey(), wrapper);
- startAliveCheckExecutor();
+ @Override
+ protected void removeClient(String serverStr) {
+ urlToClient.remove(serverStr);
}
- private int interval = CHECK_INTERVAL;
-
- /**
- * LBHttpSolrServer keeps pinging the dead servers at fixed interval to find if it is alive. Use this to set that
- * interval
- *
- * @param interval time in milliseconds
- */
- public void setAliveCheckInterval(int interval) {
- if (interval <= 0) {
- throw new IllegalArgumentException("Alive check interval must be " +
- "positive, specified value = " + interval);
- }
- this.interval = interval;
+ @Override
+ public String removeSolrServer(String server) {
+ urlToClient.remove(server);
+ return super.removeSolrServer(server);
}
- private void startAliveCheckExecutor() {
- // double-checked locking, but it's OK because we don't *do* anything with aliveCheckExecutor
- // if it's not null.
- if (aliveCheckExecutor == null) {
- synchronized (this) {
- if (aliveCheckExecutor == null) {
- aliveCheckExecutor = Executors.newSingleThreadScheduledExecutor(
- new SolrjNamedThreadFactory("aliveCheckExecutor"));
- aliveCheckExecutor.scheduleAtFixedRate(
- getAliveCheckRunner(new WeakReference<>(this)),
- this.interval, this.interval, TimeUnit.MILLISECONDS);
- }
- }
+ @Override
+ public void close() {
+ super.close();
+ if(clientIsInternal) {
+ HttpClientUtil.close(httpClient);
}
}
- private static Runnable getAliveCheckRunner(final WeakReference<LBHttpSolrClient> lbRef) {
- return () -> {
- LBHttpSolrClient lb = lbRef.get();
- if (lb != null && lb.zombieServers != null) {
- for (ServerWrapper zombieServer : lb.zombieServers.values()) {
- lb.checkAZombieServer(zombieServer);
- }
- }
- };
- }
-
/**
* Return the HttpClient this instance uses.
*/
@@ -834,50 +275,6 @@ public class LBHttpSolrClient extends SolrClient {
return httpClient;
}
- public ResponseParser getParser() {
- return parser;
- }
-
- /**
- * Changes the {@link ResponseParser} that will be used for the internal
- * SolrServer objects.
- *
- * @param parser Default Response Parser chosen to parse the response if the parser
- * were not specified as part of the request.
- * @see org.apache.solr.client.solrj.SolrRequest#getResponseParser()
- */
- public void setParser(ResponseParser parser) {
- this.parser = parser;
- }
-
- /**
- * Changes the {@link RequestWriter} that will be used for the internal
- * SolrServer objects.
- *
- * @param requestWriter Default RequestWriter, used to encode requests sent to the server.
- */
- public void setRequestWriter(RequestWriter requestWriter) {
- this.requestWriter = requestWriter;
- }
-
- public RequestWriter getRequestWriter() {
- return requestWriter;
- }
-
- @Override
- protected void finalize() throws Throwable {
- try {
- if(this.aliveCheckExecutor!=null)
- this.aliveCheckExecutor.shutdownNow();
- } finally {
- super.finalize();
- }
- }
-
- // defaults
- private static final int CHECK_INTERVAL = 60 * 1000; //1 minute between checks
- private static final int NONSTANDARD_PING_LIMIT = 5; // number of times we'll ping dead servers not in the server list
-
/**
* Constructs {@link LBHttpSolrClient} instances from provided configuration.
*/
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/65fb0359/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBSolrClient.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBSolrClient.java
new file mode 100644
index 0000000..8f9b4f8
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/LBSolrClient.java
@@ -0,0 +1,707 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.client.solrj.impl;
+
+import java.io.IOException;
+import java.lang.ref.WeakReference;
+import java.net.ConnectException;
+import java.net.MalformedURLException;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.solr.client.solrj.ResponseParser;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.request.IsUpdateRequest;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.client.solrj.request.RequestWriter;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.SolrjNamedThreadFactory;
+import org.slf4j.MDC;
+
+import static org.apache.solr.common.params.CommonParams.ADMIN_PATHS;
+
+public abstract class LBSolrClient extends SolrClient {
+
+ // defaults
+ private static final Set<Integer> RETRY_CODES = new HashSet<>(Arrays.asList(404, 403, 503, 500));
+ private static final int CHECK_INTERVAL = 60 * 1000; //1 minute between checks
+ private static final int NONSTANDARD_PING_LIMIT = 5; // number of times we'll ping dead servers not in the server list
+
+ // keys to the maps are currently of the form "http://localhost:8983/solr"
+ // which should be equivalent to HttpSolrServer.getBaseURL()
+ private final Map<String, ServerWrapper> aliveServers = new LinkedHashMap<>();
+ // access to aliveServers should be synchronized on itself
+
+ private final Map<String, ServerWrapper> zombieServers = new ConcurrentHashMap<>();
+
+ // changes to aliveServers are reflected in this array, no need to synchronize
+ private volatile ServerWrapper[] aliveServerList = new ServerWrapper[0];
+
+
+ private ScheduledExecutorService aliveCheckExecutor;
+
+ private int interval = CHECK_INTERVAL;
+ private final AtomicInteger counter = new AtomicInteger(-1);
+
+ private static final SolrQuery solrQuery = new SolrQuery("*:*");
+ protected volatile ResponseParser parser;
+ protected volatile RequestWriter requestWriter;
+
+ protected Set<String> queryParams = new HashSet<>();
+
+ static {
+ solrQuery.setRows(0);
+ /**
+ * Default sort (if we don't supply a sort) is by score and since
+ * we request 0 rows any sorting and scoring is not necessary.
+ * SolrQuery.DOCID schema-independently specifies a non-scoring sort.
+ * <code>_docid_ asc</code> sort is efficient,
+ * <code>_docid_ desc</code> sort is not, so choose ascending DOCID sort.
+ */
+ solrQuery.setSort(SolrQuery.DOCID, SolrQuery.ORDER.asc);
+ // not a top-level request, we are interested only in the server being sent to i.e. it need not distribute our request to further servers
+ solrQuery.setDistrib(false);
+ }
+
+ protected static class ServerWrapper {
+ final String baseUrl;
+
+ // "standard" servers are used by default. They normally live in the alive list
+ // and move to the zombie list when unavailable. When they become available again,
+ // they move back to the alive list.
+ boolean standard = true;
+
+ int failedPings = 0;
+
+ ServerWrapper(String baseUrl) {
+ this.baseUrl = baseUrl;
+ }
+
+ public String getBaseUrl() {
+ return baseUrl;
+ }
+
+ @Override
+ public String toString() {
+ return baseUrl;
+ }
+
+ @Override
+ public int hashCode() {
+ return baseUrl.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) return true;
+ if (!(obj instanceof ServerWrapper)) return false;
+ return baseUrl.equals(((ServerWrapper)obj).baseUrl);
+ }
+ }
+
+
+ public static class Req {
+ protected SolrRequest request;
+ protected List<String> servers;
+ protected int numDeadServersToTry;
+ private final Integer numServersToTry;
+
+ public Req(SolrRequest request, List<String> servers) {
+ this(request, servers, null);
+ }
+
+ public Req(SolrRequest request, List<String> servers, Integer numServersToTry) {
+ this.request = request;
+ this.servers = servers;
+ this.numDeadServersToTry = servers.size();
+ this.numServersToTry = numServersToTry;
+ }
+
+ public SolrRequest getRequest() {
+ return request;
+ }
+ public List<String> getServers() {
+ return servers;
+ }
+
+ /** @return the number of dead servers to try if there are no live servers left */
+ public int getNumDeadServersToTry() {
+ return numDeadServersToTry;
+ }
+
+ /** @param numDeadServersToTry The number of dead servers to try if there are no live servers left.
+ * Defaults to the number of servers in this request. */
+ public void setNumDeadServersToTry(int numDeadServersToTry) {
+ this.numDeadServersToTry = numDeadServersToTry;
+ }
+
+ public Integer getNumServersToTry() {
+ return numServersToTry;
+ }
+ }
+
+ public static class Rsp {
+ protected String server;
+ protected NamedList<Object> rsp;
+
+ /** The response from the server */
+ public NamedList<Object> getResponse() {
+ return rsp;
+ }
+
+ /** The server that returned the response */
+ public String getServer() {
+ return server;
+ }
+ }
+
+ public LBSolrClient(List<String> baseSolrUrls) {
+ if (!baseSolrUrls.isEmpty()) {
+ for (String s : baseSolrUrls) {
+ ServerWrapper wrapper = createServerWrapper(s);
+ aliveServers.put(wrapper.getBaseUrl(), wrapper);
+ }
+ updateAliveList();
+ }
+ }
+
+ protected void updateAliveList() {
+ synchronized (aliveServers) {
+ aliveServerList = aliveServers.values().toArray(new ServerWrapper[0]);
+ }
+ }
+
+ protected ServerWrapper createServerWrapper(String baseUrl) {
+ return new ServerWrapper(baseUrl);
+ }
+
+ public Set<String> getQueryParams() {
+ return queryParams;
+ }
+
+ /**
+ * Expert Method.
+ * @param queryParams set of param keys to only send via the query string
+ */
+ public void setQueryParams(Set<String> queryParams) {
+ this.queryParams = queryParams;
+ }
+ public void addQueryParams(String queryOnlyParam) {
+ this.queryParams.add(queryOnlyParam) ;
+ }
+
+ public static String normalize(String server) {
+ if (server.endsWith("/"))
+ server = server.substring(0, server.length() - 1);
+ return server;
+ }
+
+
+ /**
+ * Tries to query a live server from the list provided in Req. Servers in the dead pool are skipped.
+ * If a request fails due to an IOException, the server is moved to the dead pool for a certain period of
+ * time, or until a test request on that server succeeds.
+ *
+ * Servers are queried in the exact order given (except servers currently in the dead pool are skipped).
+ * If no live servers from the provided list remain to be tried, a number of previously skipped dead servers will be tried.
+ * Req.getNumDeadServersToTry() controls how many dead servers will be tried.
+ *
+ * If no live servers are found a SolrServerException is thrown.
+ *
+ * @param req contains both the request as well as the list of servers to query
+ *
+ * @return the result of the request
+ *
+ * @throws IOException If there is a low-level I/O error.
+ */
+ public Rsp request(Req req) throws SolrServerException, IOException {
+ Rsp rsp = new Rsp();
+ Exception ex = null;
+ boolean isNonRetryable = req.request instanceof IsUpdateRequest || ADMIN_PATHS.contains(req.request.getPath());
+ List<ServerWrapper> skipped = null;
+
+ final Integer numServersToTry = req.getNumServersToTry();
+ int numServersTried = 0;
+
+ boolean timeAllowedExceeded = false;
+ long timeAllowedNano = getTimeAllowedInNanos(req.getRequest());
+ long timeOutTime = System.nanoTime() + timeAllowedNano;
+ for (String serverStr : req.getServers()) {
+ if (timeAllowedExceeded = isTimeExceeded(timeAllowedNano, timeOutTime)) {
+ break;
+ }
+
+ serverStr = normalize(serverStr);
+ // if the server is currently a zombie, just skip to the next one
+ ServerWrapper wrapper = zombieServers.get(serverStr);
+ if (wrapper != null) {
+ // System.out.println("ZOMBIE SERVER QUERIED: " + serverStr);
+ final int numDeadServersToTry = req.getNumDeadServersToTry();
+ if (numDeadServersToTry > 0) {
+ if (skipped == null) {
+ skipped = new ArrayList<>(numDeadServersToTry);
+ skipped.add(wrapper);
+ }
+ else if (skipped.size() < numDeadServersToTry) {
+ skipped.add(wrapper);
+ }
+ }
+ continue;
+ }
+ try {
+ MDC.put("LBSolrClient.url", serverStr);
+
+ if (numServersToTry != null && numServersTried > numServersToTry.intValue()) {
+ break;
+ }
+
+ ++numServersTried;
+ ex = doRequest(serverStr, req, rsp, isNonRetryable, false);
+ if (!aliveServers.containsKey(serverStr) && !zombieServers.containsKey(serverStr)) {
+ removeClient(serverStr);
+ }
+ if (ex == null) {
+ return rsp; // SUCCESS
+ }
+ } finally {
+ MDC.remove("LBSolrClient.url");
+ }
+ }
+
+ // try the servers we previously skipped
+ if (skipped != null) {
+ for (ServerWrapper wrapper : skipped) {
+ if (timeAllowedExceeded = isTimeExceeded(timeAllowedNano, timeOutTime)) {
+ break;
+ }
+
+ if (numServersToTry != null && numServersTried > numServersToTry.intValue()) {
+ break;
+ }
+
+ try {
+ MDC.put("LBSolrClient.url", wrapper.getBaseUrl());
+ ++numServersTried;
+ ex = doRequest(wrapper.baseUrl, req, rsp, isNonRetryable, true);
+ if (ex == null) {
+ return rsp; // SUCCESS
+ }
+ } finally {
+ MDC.remove("LBSolrClient.url");
+ }
+ }
+ }
+
+
+ final String solrServerExceptionMessage;
+ if (timeAllowedExceeded) {
+ solrServerExceptionMessage = "Time allowed to handle this request exceeded";
+ } else {
+ if (numServersToTry != null && numServersTried > numServersToTry.intValue()) {
+ solrServerExceptionMessage = "No live SolrServers available to handle this request:"
+ + " numServersTried="+numServersTried
+ + " numServersToTry="+numServersToTry.intValue();
+ } else {
+ solrServerExceptionMessage = "No live SolrServers available to handle this request";
+ }
+ }
+ if (ex == null) {
+ throw new SolrServerException(solrServerExceptionMessage);
+ } else {
+ throw new SolrServerException(solrServerExceptionMessage+":" + zombieServers.keySet(), ex);
+ }
+ }
+
+ // In some case a client which is not belonging to server list may be created to handle Req,
+ // by overriding this, subclasses will be able to remove the client from cache.
+ protected void removeClient(String serverStr) {
+
+ }
+
+ /**
+ * @return time allowed in nanos, returns -1 if no time_allowed is specified.
+ */
+ private long getTimeAllowedInNanos(final SolrRequest req) {
+ SolrParams reqParams = req.getParams();
+ return reqParams == null ? -1 :
+ TimeUnit.NANOSECONDS.convert(reqParams.getInt(CommonParams.TIME_ALLOWED, -1), TimeUnit.MILLISECONDS);
+ }
+
+ private boolean isTimeExceeded(long timeAllowedNano, long timeOutTime) {
+ return timeAllowedNano > 0 && System.nanoTime() > timeOutTime;
+ }
+
+ protected Exception doRequest(String baseUrl, Req req, Rsp rsp, boolean isNonRetryable,
+ boolean isZombie) throws SolrServerException, IOException {
+ Exception ex = null;
+ try {
+ rsp.server = baseUrl;
+ req.getRequest().setBasePath(baseUrl);
+ rsp.rsp = getClient(baseUrl).request(req.getRequest(), (String) null);
+ if (isZombie) {
+ zombieServers.remove(baseUrl);
+ }
+ } catch (HttpSolrClient.RemoteExecutionException e){
+ throw e;
+ } catch(SolrException e) {
+ // we retry on 404 or 403 or 503 or 500
+ // unless it's an update - then we only retry on connect exception
+ if (!isNonRetryable && RETRY_CODES.contains(e.code())) {
+ ex = (!isZombie) ? addZombie(baseUrl, e) : e;
+ } else {
+ // Server is alive but the request was likely malformed or invalid
+ if (isZombie) {
+ zombieServers.remove(baseUrl);
+ }
+ throw e;
+ }
+ } catch (SocketException e) {
+ if (!isNonRetryable || e instanceof ConnectException) {
+ ex = (!isZombie) ? addZombie(baseUrl, e) : e;
+ } else {
+ throw e;
+ }
+ } catch (SocketTimeoutException e) {
+ if (!isNonRetryable) {
+ ex = (!isZombie) ? addZombie(baseUrl, e) : e;
+ } else {
+ throw e;
+ }
+ } catch (SolrServerException e) {
+ Throwable rootCause = e.getRootCause();
+ if (!isNonRetryable && rootCause instanceof IOException) {
+ ex = (!isZombie) ? addZombie(baseUrl, e) : e;
+ } else if (isNonRetryable && rootCause instanceof ConnectException) {
+ ex = (!isZombie) ? addZombie(baseUrl, e) : e;
+ } else {
+ throw e;
+ }
+ } catch (Exception e) {
+ throw new SolrServerException(e);
+ }
+
+ return ex;
+ }
+
+ protected abstract SolrClient getClient(String baseUrl);
+
+ private Exception addZombie(String serverStr, Exception e) {
+ ServerWrapper wrapper = createServerWrapper(serverStr);
+ wrapper.standard = false;
+ zombieServers.put(serverStr, wrapper);
+ startAliveCheckExecutor();
+ return e;
+ }
+
+ /**
+ * LBHttpSolrServer keeps pinging the dead servers at fixed interval to find if it is alive. Use this to set that
+ * interval
+ *
+ * @param interval time in milliseconds
+ */
+ public void setAliveCheckInterval(int interval) {
+ if (interval <= 0) {
+ throw new IllegalArgumentException("Alive check interval must be " +
+ "positive, specified value = " + interval);
+ }
+ this.interval = interval;
+ }
+
+ private void startAliveCheckExecutor() {
+ // double-checked locking, but it's OK because we don't *do* anything with aliveCheckExecutor
+ // if it's not null.
+ if (aliveCheckExecutor == null) {
+ synchronized (this) {
+ if (aliveCheckExecutor == null) {
+ aliveCheckExecutor = Executors.newSingleThreadScheduledExecutor(
+ new SolrjNamedThreadFactory("aliveCheckExecutor"));
+ aliveCheckExecutor.scheduleAtFixedRate(
+ getAliveCheckRunner(new WeakReference<>(this)),
+ this.interval, this.interval, TimeUnit.MILLISECONDS);
+ }
+ }
+ }
+ }
+
+ private static Runnable getAliveCheckRunner(final WeakReference<LBSolrClient> lbRef) {
+ return () -> {
+ LBSolrClient lb = lbRef.get();
+ if (lb != null && lb.zombieServers != null) {
+ for (Object zombieServer : lb.zombieServers.values()) {
+ lb.checkAZombieServer((ServerWrapper)zombieServer);
+ }
+ }
+ };
+ }
+
+ public ResponseParser getParser() {
+ return parser;
+ }
+
+ /**
+ * Changes the {@link ResponseParser} that will be used for the internal
+ * SolrServer objects.
+ *
+ * @param parser Default Response Parser chosen to parse the response if the parser
+ * were not specified as part of the request.
+ * @see org.apache.solr.client.solrj.SolrRequest#getResponseParser()
+ */
+ public void setParser(ResponseParser parser) {
+ this.parser = parser;
+ }
+
+ /**
+ * Changes the {@link RequestWriter} that will be used for the internal
+ * SolrServer objects.
+ *
+ * @param requestWriter Default RequestWriter, used to encode requests sent to the server.
+ */
+ public void setRequestWriter(RequestWriter requestWriter) {
+ this.requestWriter = requestWriter;
+ }
+
+ public RequestWriter getRequestWriter() {
+ return requestWriter;
+ }
+
+ @Override
+ protected void finalize() throws Throwable {
+ try {
+ if(this.aliveCheckExecutor!=null)
+ this.aliveCheckExecutor.shutdownNow();
+ } finally {
+ super.finalize();
+ }
+ }
+
+ private void checkAZombieServer(ServerWrapper zombieServer) {
+ try {
+ QueryRequest queryRequest = new QueryRequest(solrQuery);
+ queryRequest.setBasePath(zombieServer.baseUrl);
+ QueryResponse resp = queryRequest.process(getClient(zombieServer.getBaseUrl()));
+ if (resp.getStatus() == 0) {
+ // server has come back up.
+ // make sure to remove from zombies before adding to alive to avoid a race condition
+ // where another thread could mark it down, move it back to zombie, and then we delete
+ // from zombie and lose it forever.
+ ServerWrapper wrapper = zombieServers.remove(zombieServer.getBaseUrl());
+ if (wrapper != null) {
+ wrapper.failedPings = 0;
+ if (wrapper.standard) {
+ addToAlive(wrapper);
+ }
+ } else {
+ // something else already moved the server from zombie to alive
+ }
+ }
+ } catch (Exception e) {
+ //Expected. The server is still down.
+ zombieServer.failedPings++;
+
+ // If the server doesn't belong in the standard set belonging to this load balancer
+ // then simply drop it after a certain number of failed pings.
+ if (!zombieServer.standard && zombieServer.failedPings >= NONSTANDARD_PING_LIMIT) {
+ zombieServers.remove(zombieServer.getBaseUrl());
+ }
+ }
+ }
+
+ private ServerWrapper removeFromAlive(String key) {
+ synchronized (aliveServers) {
+ ServerWrapper wrapper = aliveServers.remove(key);
+ if (wrapper != null)
+ updateAliveList();
+ return wrapper;
+ }
+ }
+
+
+ private void addToAlive(ServerWrapper wrapper) {
+ synchronized (aliveServers) {
+ ServerWrapper prev = aliveServers.put(wrapper.getBaseUrl(), wrapper);
+ // TODO: warn if there was a previous entry?
+ updateAliveList();
+ }
+ }
+
+ public void addSolrServer(String server) throws MalformedURLException {
+ addToAlive(createServerWrapper(server));
+ }
+
+ public String removeSolrServer(String server) {
+ try {
+ server = new URL(server).toExternalForm();
+ } catch (MalformedURLException e) {
+ throw new RuntimeException(e);
+ }
+ if (server.endsWith("/")) {
+ server = server.substring(0, server.length() - 1);
+ }
+
+ // there is a small race condition here - if the server is in the process of being moved between
+ // lists, we could fail to remove it.
+ removeFromAlive(server);
+ zombieServers.remove(server);
+ return null;
+ }
+
+ /**
+ * Tries to query a live server. A SolrServerException is thrown if all servers are dead.
+ * If the request failed due to IOException then the live server is moved to dead pool and the request is
+ * retried on another live server. After live servers are exhausted, any servers previously marked as dead
+ * will be tried before failing the request.
+ *
+ * @param request the SolrRequest.
+ *
+ * @return response
+ *
+ * @throws IOException If there is a low-level I/O error.
+ */
+ @Override
+ public NamedList<Object> request(final SolrRequest request, String collection)
+ throws SolrServerException, IOException {
+ return request(request, collection, null);
+ }
+
+ public NamedList<Object> request(final SolrRequest request, String collection,
+ final Integer numServersToTry) throws SolrServerException, IOException {
+ Exception ex = null;
+ ServerWrapper[] serverList = aliveServerList;
+
+ final int maxTries = (numServersToTry == null ? serverList.length : numServersToTry.intValue());
+ int numServersTried = 0;
+ Map<String,ServerWrapper> justFailed = null;
+
+ boolean timeAllowedExceeded = false;
+ long timeAllowedNano = getTimeAllowedInNanos(request);
+ long timeOutTime = System.nanoTime() + timeAllowedNano;
+ for (int attempts=0; attempts<maxTries; attempts++) {
+ if (timeAllowedExceeded = isTimeExceeded(timeAllowedNano, timeOutTime)) {
+ break;
+ }
+
+ int count = counter.incrementAndGet() & Integer.MAX_VALUE;
+ ServerWrapper wrapper = serverList[count % serverList.length];
+
+ try {
+ ++numServersTried;
+ request.setBasePath(wrapper.baseUrl);
+ return getClient(wrapper.getBaseUrl()).request(request, collection);
+ } catch (SolrException e) {
+ // Server is alive but the request was malformed or invalid
+ throw e;
+ } catch (SolrServerException e) {
+ if (e.getRootCause() instanceof IOException) {
+ ex = e;
+ moveAliveToDead(wrapper);
+ if (justFailed == null) justFailed = new HashMap<>();
+ justFailed.put(wrapper.getBaseUrl(), wrapper);
+ } else {
+ throw e;
+ }
+ } catch (Exception e) {
+ throw new SolrServerException(e);
+ }
+ }
+
+ // try other standard servers that we didn't try just now
+ for (ServerWrapper wrapper : zombieServers.values()) {
+ if (timeAllowedExceeded = isTimeExceeded(timeAllowedNano, timeOutTime)) {
+ break;
+ }
+
+ if (wrapper.standard==false || justFailed!=null && justFailed.containsKey(wrapper.getBaseUrl())) continue;
+ try {
+ ++numServersTried;
+ request.setBasePath(wrapper.baseUrl);
+ NamedList<Object> rsp = getClient(wrapper.baseUrl).request(request, collection);
+ // remove from zombie list *before* adding to alive to avoid a race that could lose a server
+ zombieServers.remove(wrapper.getBaseUrl());
+ addToAlive(wrapper);
+ return rsp;
+ } catch (SolrException e) {
+ // Server is alive but the request was malformed or invalid
+ throw e;
+ } catch (SolrServerException e) {
+ if (e.getRootCause() instanceof IOException) {
+ ex = e;
+ // still dead
+ } else {
+ throw e;
+ }
+ } catch (Exception e) {
+ throw new SolrServerException(e);
+ }
+ }
+
+
+ final String solrServerExceptionMessage;
+ if (timeAllowedExceeded) {
+ solrServerExceptionMessage = "Time allowed to handle this request exceeded";
+ } else {
+ if (numServersToTry != null && numServersTried > numServersToTry.intValue()) {
+ solrServerExceptionMessage = "No live SolrServers available to handle this request:"
+ + " numServersTried="+numServersTried
+ + " numServersToTry="+numServersToTry.intValue();
+ } else {
+ solrServerExceptionMessage = "No live SolrServers available to handle this request";
+ }
+ }
+ if (ex == null) {
+ throw new SolrServerException(solrServerExceptionMessage);
+ } else {
+ throw new SolrServerException(solrServerExceptionMessage, ex);
+ }
+ }
+
+ private void moveAliveToDead(ServerWrapper wrapper) {
+ wrapper = removeFromAlive(wrapper.getBaseUrl());
+ if (wrapper == null)
+ return; // another thread already detected the failure and removed it
+ zombieServers.put(wrapper.getBaseUrl(), wrapper);
+ startAliveCheckExecutor();
+ }
+
+ @Override
+ public void close() {
+ if (aliveCheckExecutor != null) {
+ aliveCheckExecutor.shutdownNow();
+ }
+ }
+}