You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by no...@apache.org on 2020/06/02 06:02:20 UTC

[lucene-solr] branch branch_8x updated: SOLR-14531: Refactor out internode requests from HttpShardHandler

This is an automated email from the ASF dual-hosted git repository.

noble pushed a commit to branch branch_8x
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/branch_8x by this push:
     new a156b3d  SOLR-14531: Refactor out internode requests from HttpShardHandler
a156b3d is described below

commit a156b3dd4bc5b14a2725f2907965e22a2f09b6eb
Author: noble <no...@apache.org>
AuthorDate: Tue Jun 2 16:00:58 2020 +1000

    SOLR-14531: Refactor out internode requests from HttpShardHandler
---
 .../solr/handler/component/HttpShardHandler.java   | 140 +-----------------
 .../solr/handler/component/ShardRequestor.java     | 162 +++++++++++++++++++++
 2 files changed, 167 insertions(+), 135 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java
index f23cf16..78f5c77 100644
--- a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java
@@ -17,26 +17,15 @@
 package org.apache.solr.handler.component;
 
 import java.io.IOException;
-import java.net.ConnectException;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.Callable;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-import io.opentracing.Span;
-import io.opentracing.Tracer;
-import io.opentracing.propagation.Format;
 import org.apache.solr.client.solrj.SolrRequest;
-import org.apache.solr.client.solrj.SolrResponse;
 import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.impl.Http2SolrClient;
-import org.apache.solr.client.solrj.impl.LBSolrClient;
 import org.apache.solr.client.solrj.request.QueryRequest;
 import org.apache.solr.client.solrj.routing.ReplicaListTransformer;
 import org.apache.solr.cloud.CloudDescriptor;
@@ -44,17 +33,12 @@ import org.apache.solr.cloud.ZkController;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.ZkCoreNodeProps;
-import org.apache.solr.common.params.CommonParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.ShardParams;
 import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.core.CoreDescriptor;
 import org.apache.solr.request.SolrQueryRequest;
-import org.apache.solr.request.SolrRequestInfo;
-import org.apache.solr.util.tracing.GlobalTracer;
-import org.apache.solr.util.tracing.SolrRequestCarrier;
-import org.slf4j.MDC;
 
 public class HttpShardHandler extends ShardHandler {
   /**
@@ -65,10 +49,9 @@ public class HttpShardHandler extends ShardHandler {
    */
   public static String ONLY_NRT_REPLICAS = "distribOnlyRealtime";
 
-  private HttpShardHandlerFactory httpShardHandlerFactory;
+  final HttpShardHandlerFactory httpShardHandlerFactory;
   private CompletionService<ShardResponse> completionService;
   private Set<Future<ShardResponse>> pending;
-  private Map<String, List<String>> shardToURLs;
   private Http2SolrClient httpClient;
 
   public HttpShardHandler(HttpShardHandlerFactory httpShardHandlerFactory, Http2SolrClient httpClient) {
@@ -76,130 +59,17 @@ public class HttpShardHandler extends ShardHandler {
     this.httpShardHandlerFactory = httpShardHandlerFactory;
     completionService = httpShardHandlerFactory.newCompletionService();
     pending = new HashSet<>();
-
-    // maps "localhost:8983|localhost:7574" to a shuffled List("http://localhost:8983","http://localhost:7574")
-    // This is primarily to keep track of what order we should use to query the replicas of a shard
-    // so that we use the same replica for all phases of a distributed request.
-    shardToURLs = new HashMap<>();
-  }
-
-
-  private static class SimpleSolrResponse extends SolrResponse {
-
-    long elapsedTime;
-
-    NamedList<Object> nl;
-
-    @Override
-    public long getElapsedTime() {
-      return elapsedTime;
-    }
-
-    @Override
-    public NamedList<Object> getResponse() {
-      return nl;
-    }
-
-    @Override
-    public void setResponse(NamedList<Object> rsp) {
-      nl = rsp;
-    }
-
-    @Override
-    public void setElapsedTime(long elapsedTime) {
-      this.elapsedTime = elapsedTime;
-    }
   }
 
 
-  // Not thread safe... don't use in Callable.
-  // Don't modify the returned URL list.
-  private List<String> getURLs(String shard) {
-    List<String> urls = shardToURLs.get(shard);
-    if (urls == null) {
-      urls = httpShardHandlerFactory.buildURLList(shard);
-      shardToURLs.put(shard, urls);
-    }
-    return urls;
-  }
-
   @Override
   public void submit(final ShardRequest sreq, final String shard, final ModifiableSolrParams params) {
-    // do this outside of the callable for thread safety reasons
-    final List<String> urls = getURLs(shard);
-    final Tracer tracer = GlobalTracer.getTracer();
-    final Span span = tracer != null ? tracer.activeSpan() : null;
-
-    Callable<ShardResponse> task = () -> {
-
-      ShardResponse srsp = new ShardResponse();
-      if (sreq.nodeName != null) {
-        srsp.setNodeName(sreq.nodeName);
-      }
-      srsp.setShardRequest(sreq);
-      srsp.setShard(shard);
-      SimpleSolrResponse ssr = new SimpleSolrResponse();
-      srsp.setSolrResponse(ssr);
-      long startTime = System.nanoTime();
-
-      try {
-        params.remove(CommonParams.WT); // use default (currently javabin)
-        params.remove(CommonParams.VERSION);
-
-        QueryRequest req = makeQueryRequest(sreq, params, shard);
-        if (tracer != null && span != null) {
-          tracer.inject(span.context(), Format.Builtin.HTTP_HEADERS, new SolrRequestCarrier(req));
-        }
-        req.setMethod(SolrRequest.METHOD.POST);
-        SolrRequestInfo requestInfo = SolrRequestInfo.getRequestInfo();
-        if (requestInfo != null) req.setUserPrincipal(requestInfo.getReq().getUserPrincipal());
-
-        // no need to set the response parser as binary is the defaultJab
-        // req.setResponseParser(new BinaryResponseParser());
-
-        // if there are no shards available for a slice, urls.size()==0
-        if (urls.size() == 0) {
-          // TODO: what's the right error code here? We should use the same thing when
-          // all of the servers for a shard are down.
-          throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, "no servers hosting shard: " + shard);
-        }
-
-        if (urls.size() <= 1) {
-          String url = urls.get(0);
-          srsp.setShardAddress(url);
-          ssr.nl = request(url, req);
-        } else {
-          LBSolrClient.Rsp rsp = httpShardHandlerFactory.makeLoadBalancedRequest(req, urls);
-          ssr.nl = rsp.getResponse();
-          srsp.setShardAddress(rsp.getServer());
-        }
-      } catch (ConnectException cex) {
-        srsp.setException(cex); //????
-      } catch (Exception th) {
-        srsp.setException(th);
-        if (th instanceof SolrException) {
-          srsp.setResponseCode(((SolrException) th).code());
-        } else {
-          srsp.setResponseCode(-1);
-        }
-      }
-
-      ssr.elapsedTime = TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
-
-      return transfomResponse(sreq, srsp, shard);
-    };
-
+    ShardRequestor shardRequestor = new ShardRequestor(sreq, shard, params, this);
     try {
-      if (shard != null) {
-        MDC.put("ShardRequest.shards", shard);
-      }
-      if (urls != null && !urls.isEmpty()) {
-        MDC.put("ShardRequest.urlList", urls.toString());
-      }
-      pending.add(completionService.submit(task));
+      shardRequestor.init();
+      pending.add(completionService.submit(shardRequestor));
     } finally {
-      MDC.remove("ShardRequest.shards");
-      MDC.remove("ShardRequest.urlList");
+      shardRequestor.end();
     }
   }
 
diff --git a/solr/core/src/java/org/apache/solr/handler/component/ShardRequestor.java b/solr/core/src/java/org/apache/solr/handler/component/ShardRequestor.java
new file mode 100644
index 0000000..0ad53ba
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/handler/component/ShardRequestor.java
@@ -0,0 +1,162 @@
+package org.apache.solr.handler.component;
+
+import io.opentracing.Span;
+import io.opentracing.Tracer;
+import io.opentracing.propagation.Format;
+import java.net.ConnectException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.SolrResponse;
+import org.apache.solr.client.solrj.impl.LBSolrClient;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.request.SolrRequestInfo;
+import org.apache.solr.util.tracing.GlobalTracer;
+import org.apache.solr.util.tracing.SolrRequestCarrier;
+import org.slf4j.MDC;
+
+class ShardRequestor implements Callable<ShardResponse> {
+  private final ShardRequest sreq;
+  private final String shard;
+  private final ModifiableSolrParams params;
+  private final Tracer tracer;
+  private final Span span;
+  private final List<String> urls;
+  private final HttpShardHandler httpShardHandler;
+
+  // maps "localhost:8983|localhost:7574" to a shuffled List("http://localhost:8983","http://localhost:7574")
+  // This is primarily to keep track of what order we should use to query the replicas of a shard
+  // so that we use the same replica for all phases of a distributed request.
+  private Map<String, List<String>> shardToURLs = new HashMap<>();
+
+  public ShardRequestor(ShardRequest sreq, String shard, ModifiableSolrParams params, HttpShardHandler httpShardHandler) {
+    this.sreq = sreq;
+    this.shard = shard;
+    this.params = params;
+    this.httpShardHandler = httpShardHandler;
+    // do this before call() for thread safety reasons
+    this.urls = getURLs(shard);
+    tracer = GlobalTracer.getTracer();
+    span = tracer != null ? tracer.activeSpan() : null;
+  }
+
+
+  // Not thread safe... don't use in Callable.
+  // Don't modify the returned URL list.
+  private List<String> getURLs(String shard) {
+    List<String> urls = shardToURLs.get(shard);
+    if (urls == null) {
+      urls = httpShardHandler.httpShardHandlerFactory.buildURLList(shard);
+      shardToURLs.put(shard, urls);
+    }
+    return urls;
+  }
+
+  void init() {
+    if (shard != null) {
+      MDC.put("ShardRequest.shards", shard);
+    }
+    if (urls != null && !urls.isEmpty()) {
+      MDC.put("ShardRequest.urlList", urls.toString());
+    }
+  }
+
+  void end() {
+    MDC.remove("ShardRequest.shards");
+    MDC.remove("ShardRequest.urlList");
+  }
+
+  @Override
+  public ShardResponse call() throws Exception {
+
+    ShardResponse srsp = new ShardResponse();
+    if (sreq.nodeName != null) {
+      srsp.setNodeName(sreq.nodeName);
+    }
+    srsp.setShardRequest(sreq);
+    srsp.setShard(shard);
+    SimpleSolrResponse ssr = new SimpleSolrResponse();
+    srsp.setSolrResponse(ssr);
+    long startTime = System.nanoTime();
+
+    try {
+      params.remove(CommonParams.WT); // use default (currently javabin)
+      params.remove(CommonParams.VERSION);
+
+      QueryRequest req = httpShardHandler.makeQueryRequest(sreq, params, shard);
+      if (tracer != null && span != null) {
+        tracer.inject(span.context(), Format.Builtin.HTTP_HEADERS, new SolrRequestCarrier(req));
+      }
+      req.setMethod(SolrRequest.METHOD.POST);
+      SolrRequestInfo requestInfo = SolrRequestInfo.getRequestInfo();
+      if (requestInfo != null) req.setUserPrincipal(requestInfo.getReq().getUserPrincipal());
+
+      // no need to set the response parser as binary is the defaultJab
+      // req.setResponseParser(new BinaryResponseParser());
+
+      // if there are no shards available for a slice, urls.size()==0
+      if (urls.size() == 0) {
+        // TODO: what's the right error code here? We should use the same thing when
+        // all of the servers for a shard are down.
+        throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, "no servers hosting shard: " + shard);
+      }
+
+      if (urls.size() <= 1) {
+        String url = urls.get(0);
+        srsp.setShardAddress(url);
+        ssr.nl = httpShardHandler.request(url, req);
+      } else {
+        LBSolrClient.Rsp rsp = httpShardHandler.httpShardHandlerFactory.makeLoadBalancedRequest(req, urls);
+        ssr.nl = rsp.getResponse();
+        srsp.setShardAddress(rsp.getServer());
+      }
+    } catch (ConnectException cex) {
+      srsp.setException(cex); //????
+    } catch (Exception th) {
+      srsp.setException(th);
+      if (th instanceof SolrException) {
+        srsp.setResponseCode(((SolrException) th).code());
+      } else {
+        srsp.setResponseCode(-1);
+      }
+    }
+
+    ssr.elapsedTime = TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
+
+    return httpShardHandler.transfomResponse(sreq, srsp, shard);
+  }
+
+  static class SimpleSolrResponse extends SolrResponse {
+
+    long elapsedTime;
+
+    NamedList<Object> nl;
+
+    @Override
+    public long getElapsedTime() {
+      return elapsedTime;
+    }
+
+    @Override
+    public NamedList<Object> getResponse() {
+      return nl;
+    }
+
+    @Override
+    public void setResponse(NamedList<Object> rsp) {
+      nl = rsp;
+    }
+
+    @Override
+    public void setElapsedTime(long elapsedTime) {
+      this.elapsedTime = elapsedTime;
+    }
+  }
+}