You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by no...@apache.org on 2020/06/02 06:02:20 UTC
[lucene-solr] branch branch_8x updated: SOLR-14531: Refactor out
internode requests from HttpShardHandler
This is an automated email from the ASF dual-hosted git repository.
noble pushed a commit to branch branch_8x
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/branch_8x by this push:
new a156b3d SOLR-14531: Refactor out internode requests from HttpShardHandler
a156b3d is described below
commit a156b3dd4bc5b14a2725f2907965e22a2f09b6eb
Author: noble <no...@apache.org>
AuthorDate: Tue Jun 2 16:00:58 2020 +1000
SOLR-14531: Refactor out internode requests from HttpShardHandler
---
.../solr/handler/component/HttpShardHandler.java | 140 +-----------------
.../solr/handler/component/ShardRequestor.java | 162 +++++++++++++++++++++
2 files changed, 167 insertions(+), 135 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java
index f23cf16..78f5c77 100644
--- a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java
@@ -17,26 +17,15 @@
package org.apache.solr.handler.component;
import java.io.IOException;
-import java.net.ConnectException;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
-import java.util.Map;
import java.util.Set;
-import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-import io.opentracing.Span;
-import io.opentracing.Tracer;
-import io.opentracing.propagation.Format;
import org.apache.solr.client.solrj.SolrRequest;
-import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.Http2SolrClient;
-import org.apache.solr.client.solrj.impl.LBSolrClient;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.routing.ReplicaListTransformer;
import org.apache.solr.cloud.CloudDescriptor;
@@ -44,17 +33,12 @@ import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
-import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.ShardParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.request.SolrQueryRequest;
-import org.apache.solr.request.SolrRequestInfo;
-import org.apache.solr.util.tracing.GlobalTracer;
-import org.apache.solr.util.tracing.SolrRequestCarrier;
-import org.slf4j.MDC;
public class HttpShardHandler extends ShardHandler {
/**
@@ -65,10 +49,9 @@ public class HttpShardHandler extends ShardHandler {
*/
public static String ONLY_NRT_REPLICAS = "distribOnlyRealtime";
- private HttpShardHandlerFactory httpShardHandlerFactory;
+ final HttpShardHandlerFactory httpShardHandlerFactory;
private CompletionService<ShardResponse> completionService;
private Set<Future<ShardResponse>> pending;
- private Map<String, List<String>> shardToURLs;
private Http2SolrClient httpClient;
public HttpShardHandler(HttpShardHandlerFactory httpShardHandlerFactory, Http2SolrClient httpClient) {
@@ -76,130 +59,17 @@ public class HttpShardHandler extends ShardHandler {
this.httpShardHandlerFactory = httpShardHandlerFactory;
completionService = httpShardHandlerFactory.newCompletionService();
pending = new HashSet<>();
-
- // maps "localhost:8983|localhost:7574" to a shuffled List("http://localhost:8983","http://localhost:7574")
- // This is primarily to keep track of what order we should use to query the replicas of a shard
- // so that we use the same replica for all phases of a distributed request.
- shardToURLs = new HashMap<>();
- }
-
-
- private static class SimpleSolrResponse extends SolrResponse {
-
- long elapsedTime;
-
- NamedList<Object> nl;
-
- @Override
- public long getElapsedTime() {
- return elapsedTime;
- }
-
- @Override
- public NamedList<Object> getResponse() {
- return nl;
- }
-
- @Override
- public void setResponse(NamedList<Object> rsp) {
- nl = rsp;
- }
-
- @Override
- public void setElapsedTime(long elapsedTime) {
- this.elapsedTime = elapsedTime;
- }
}
- // Not thread safe... don't use in Callable.
- // Don't modify the returned URL list.
- private List<String> getURLs(String shard) {
- List<String> urls = shardToURLs.get(shard);
- if (urls == null) {
- urls = httpShardHandlerFactory.buildURLList(shard);
- shardToURLs.put(shard, urls);
- }
- return urls;
- }
-
@Override
public void submit(final ShardRequest sreq, final String shard, final ModifiableSolrParams params) {
- // do this outside of the callable for thread safety reasons
- final List<String> urls = getURLs(shard);
- final Tracer tracer = GlobalTracer.getTracer();
- final Span span = tracer != null ? tracer.activeSpan() : null;
-
- Callable<ShardResponse> task = () -> {
-
- ShardResponse srsp = new ShardResponse();
- if (sreq.nodeName != null) {
- srsp.setNodeName(sreq.nodeName);
- }
- srsp.setShardRequest(sreq);
- srsp.setShard(shard);
- SimpleSolrResponse ssr = new SimpleSolrResponse();
- srsp.setSolrResponse(ssr);
- long startTime = System.nanoTime();
-
- try {
- params.remove(CommonParams.WT); // use default (currently javabin)
- params.remove(CommonParams.VERSION);
-
- QueryRequest req = makeQueryRequest(sreq, params, shard);
- if (tracer != null && span != null) {
- tracer.inject(span.context(), Format.Builtin.HTTP_HEADERS, new SolrRequestCarrier(req));
- }
- req.setMethod(SolrRequest.METHOD.POST);
- SolrRequestInfo requestInfo = SolrRequestInfo.getRequestInfo();
- if (requestInfo != null) req.setUserPrincipal(requestInfo.getReq().getUserPrincipal());
-
- // no need to set the response parser as binary is the defaultJab
- // req.setResponseParser(new BinaryResponseParser());
-
- // if there are no shards available for a slice, urls.size()==0
- if (urls.size() == 0) {
- // TODO: what's the right error code here? We should use the same thing when
- // all of the servers for a shard are down.
- throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, "no servers hosting shard: " + shard);
- }
-
- if (urls.size() <= 1) {
- String url = urls.get(0);
- srsp.setShardAddress(url);
- ssr.nl = request(url, req);
- } else {
- LBSolrClient.Rsp rsp = httpShardHandlerFactory.makeLoadBalancedRequest(req, urls);
- ssr.nl = rsp.getResponse();
- srsp.setShardAddress(rsp.getServer());
- }
- } catch (ConnectException cex) {
- srsp.setException(cex); //????
- } catch (Exception th) {
- srsp.setException(th);
- if (th instanceof SolrException) {
- srsp.setResponseCode(((SolrException) th).code());
- } else {
- srsp.setResponseCode(-1);
- }
- }
-
- ssr.elapsedTime = TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
-
- return transfomResponse(sreq, srsp, shard);
- };
-
+ ShardRequestor shardRequestor = new ShardRequestor(sreq, shard, params, this);
try {
- if (shard != null) {
- MDC.put("ShardRequest.shards", shard);
- }
- if (urls != null && !urls.isEmpty()) {
- MDC.put("ShardRequest.urlList", urls.toString());
- }
- pending.add(completionService.submit(task));
+ shardRequestor.init();
+ pending.add(completionService.submit(shardRequestor));
} finally {
- MDC.remove("ShardRequest.shards");
- MDC.remove("ShardRequest.urlList");
+ shardRequestor.end();
}
}
diff --git a/solr/core/src/java/org/apache/solr/handler/component/ShardRequestor.java b/solr/core/src/java/org/apache/solr/handler/component/ShardRequestor.java
new file mode 100644
index 0000000..0ad53ba
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/handler/component/ShardRequestor.java
@@ -0,0 +1,162 @@
+package org.apache.solr.handler.component;
+
+import io.opentracing.Span;
+import io.opentracing.Tracer;
+import io.opentracing.propagation.Format;
+import java.net.ConnectException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.SolrResponse;
+import org.apache.solr.client.solrj.impl.LBSolrClient;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.request.SolrRequestInfo;
+import org.apache.solr.util.tracing.GlobalTracer;
+import org.apache.solr.util.tracing.SolrRequestCarrier;
+import org.slf4j.MDC;
+
+class ShardRequestor implements Callable<ShardResponse> {
+ private final ShardRequest sreq;
+ private final String shard;
+ private final ModifiableSolrParams params;
+ private final Tracer tracer;
+ private final Span span;
+ private final List<String> urls;
+ private final HttpShardHandler httpShardHandler;
+
+ // maps "localhost:8983|localhost:7574" to a shuffled List("http://localhost:8983","http://localhost:7574")
+ // This is primarily to keep track of what order we should use to query the replicas of a shard
+ // so that we use the same replica for all phases of a distributed request.
+ private Map<String, List<String>> shardToURLs = new HashMap<>();
+
+ public ShardRequestor(ShardRequest sreq, String shard, ModifiableSolrParams params, HttpShardHandler httpShardHandler) {
+ this.sreq = sreq;
+ this.shard = shard;
+ this.params = params;
+ this.httpShardHandler = httpShardHandler;
+ // do this before call() for thread safety reasons
+ this.urls = getURLs(shard);
+ tracer = GlobalTracer.getTracer();
+ span = tracer != null ? tracer.activeSpan() : null;
+ }
+
+
+ // Not thread safe... don't use in Callable.
+ // Don't modify the returned URL list.
+ private List<String> getURLs(String shard) {
+ List<String> urls = shardToURLs.get(shard);
+ if (urls == null) {
+ urls = httpShardHandler.httpShardHandlerFactory.buildURLList(shard);
+ shardToURLs.put(shard, urls);
+ }
+ return urls;
+ }
+
+ void init() {
+ if (shard != null) {
+ MDC.put("ShardRequest.shards", shard);
+ }
+ if (urls != null && !urls.isEmpty()) {
+ MDC.put("ShardRequest.urlList", urls.toString());
+ }
+ }
+
+ void end() {
+ MDC.remove("ShardRequest.shards");
+ MDC.remove("ShardRequest.urlList");
+ }
+
+ @Override
+ public ShardResponse call() throws Exception {
+
+ ShardResponse srsp = new ShardResponse();
+ if (sreq.nodeName != null) {
+ srsp.setNodeName(sreq.nodeName);
+ }
+ srsp.setShardRequest(sreq);
+ srsp.setShard(shard);
+ SimpleSolrResponse ssr = new SimpleSolrResponse();
+ srsp.setSolrResponse(ssr);
+ long startTime = System.nanoTime();
+
+ try {
+ params.remove(CommonParams.WT); // use default (currently javabin)
+ params.remove(CommonParams.VERSION);
+
+ QueryRequest req = httpShardHandler.makeQueryRequest(sreq, params, shard);
+ if (tracer != null && span != null) {
+ tracer.inject(span.context(), Format.Builtin.HTTP_HEADERS, new SolrRequestCarrier(req));
+ }
+ req.setMethod(SolrRequest.METHOD.POST);
+ SolrRequestInfo requestInfo = SolrRequestInfo.getRequestInfo();
+ if (requestInfo != null) req.setUserPrincipal(requestInfo.getReq().getUserPrincipal());
+
+ // no need to set the response parser as binary is the defaultJab
+ // req.setResponseParser(new BinaryResponseParser());
+
+ // if there are no shards available for a slice, urls.size()==0
+ if (urls.size() == 0) {
+ // TODO: what's the right error code here? We should use the same thing when
+ // all of the servers for a shard are down.
+ throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, "no servers hosting shard: " + shard);
+ }
+
+ if (urls.size() <= 1) {
+ String url = urls.get(0);
+ srsp.setShardAddress(url);
+ ssr.nl = httpShardHandler.request(url, req);
+ } else {
+ LBSolrClient.Rsp rsp = httpShardHandler.httpShardHandlerFactory.makeLoadBalancedRequest(req, urls);
+ ssr.nl = rsp.getResponse();
+ srsp.setShardAddress(rsp.getServer());
+ }
+ } catch (ConnectException cex) {
+ srsp.setException(cex); //????
+ } catch (Exception th) {
+ srsp.setException(th);
+ if (th instanceof SolrException) {
+ srsp.setResponseCode(((SolrException) th).code());
+ } else {
+ srsp.setResponseCode(-1);
+ }
+ }
+
+ ssr.elapsedTime = TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
+
+ return httpShardHandler.transfomResponse(sreq, srsp, shard);
+ }
+
+ static class SimpleSolrResponse extends SolrResponse {
+
+ long elapsedTime;
+
+ NamedList<Object> nl;
+
+ @Override
+ public long getElapsedTime() {
+ return elapsedTime;
+ }
+
+ @Override
+ public NamedList<Object> getResponse() {
+ return nl;
+ }
+
+ @Override
+ public void setResponse(NamedList<Object> rsp) {
+ nl = rsp;
+ }
+
+ @Override
+ public void setElapsedTime(long elapsedTime) {
+ this.elapsedTime = elapsedTime;
+ }
+ }
+}