You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by da...@apache.org on 2018/10/30 09:14:03 UTC
lucene-solr:jira/http2: Temporary change Http2SolrClient for benchmark
Repository: lucene-solr
Updated Branches:
refs/heads/jira/http2 4943e412e -> 3d536ed54
Temporary change Http2SolrClient for benchmark
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/3d536ed5
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/3d536ed5
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/3d536ed5
Branch: refs/heads/jira/http2
Commit: 3d536ed5492b8f74efe8d4bc1cb5e00169939fdb
Parents: 4943e41
Author: Cao Manh Dat <da...@apache.org>
Authored: Tue Oct 30 09:13:51 2018 +0000
Committer: Cao Manh Dat <da...@apache.org>
Committed: Tue Oct 30 09:13:51 2018 +0000
----------------------------------------------------------------------
.../component/HttpShardHandlerFactory.java | 4 +-
.../solr/security/PKIAuthenticationPlugin.java | 2 +-
.../apache/solr/update/UpdateShardHandler.java | 5 +-
.../HttpParamDelegationTokenPlugin.java | 2 +-
.../solr/update/MockingHttp2SolrClient.java | 10 +-
.../solr/client/solrj/impl/Http2SolrClient.java | 514 ++++++++++---------
6 files changed, 270 insertions(+), 267 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/3d536ed5/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java
index c5207e6..fcd705b 100644
--- a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java
+++ b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java
@@ -205,9 +205,7 @@ public class HttpShardHandlerFactory extends ShardHandlerFactory implements org.
this.defaultClient = new Http2SolrClient.Builder()
.connectionTimeout(connectionTimeout)
- .idleTimeout(soTimeout)
- .maxConnectionsPerHost(maxConnectionsPerHost).build();
- this.defaultClient.addListenerFactory(this.httpListenerFactory);
+ .idleTimeout(soTimeout).build();
this.loadbalancer = new LBHttp2SolrClient(defaultClient);
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/3d536ed5/solr/core/src/java/org/apache/solr/security/PKIAuthenticationPlugin.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/security/PKIAuthenticationPlugin.java b/solr/core/src/java/org/apache/solr/security/PKIAuthenticationPlugin.java
index 6f85d95..a311fc3 100644
--- a/solr/core/src/java/org/apache/solr/security/PKIAuthenticationPlugin.java
+++ b/solr/core/src/java/org/apache/solr/security/PKIAuthenticationPlugin.java
@@ -232,7 +232,7 @@ public class PKIAuthenticationPlugin extends AuthenticationPlugin implements Htt
generateToken().ifPresent(s -> request.header(HEADER, myNodeName + " " + s));
}
};
- client.addListenerFactory(() -> listener);
+// client.addListenerFactory(() -> listener);
}
@Override
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/3d536ed5/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java b/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java
index 4af1ae7..f3f072d 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java
@@ -110,11 +110,10 @@ public class UpdateShardHandler implements SolrMetricProducer, SolrInfoBean {
if (cfg != null) {
updateOnlyClientBuilder
.connectionTimeout(cfg.getDistributedConnectionTimeout())
- .idleTimeout(cfg.getDistributedSocketTimeout())
- .maxConnectionsPerHost(cfg.getMaxUpdateConnectionsPerHost());
+ .idleTimeout(cfg.getDistributedSocketTimeout());
}
updateOnlyClient = updateOnlyClientBuilder.build();
- updateOnlyClient.addListenerFactory(updateHttpListenerFactory);
+// updateOnlyClient.addListenerFactory(updateHttpListenerFactory);
defaultClient = HttpClientUtil.createClient(clientParams, defaultConnectionManager, false, httpRequestExecutor);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/3d536ed5/solr/core/src/test/org/apache/solr/security/HttpParamDelegationTokenPlugin.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/security/HttpParamDelegationTokenPlugin.java b/solr/core/src/test/org/apache/solr/security/HttpParamDelegationTokenPlugin.java
index a8f0355..f0bc23f 100644
--- a/solr/core/src/test/org/apache/solr/security/HttpParamDelegationTokenPlugin.java
+++ b/solr/core/src/test/org/apache/solr/security/HttpParamDelegationTokenPlugin.java
@@ -155,7 +155,7 @@ public class HttpParamDelegationTokenPlugin extends KerberosPlugin {
getPrincipal().ifPresent(usr -> request.header(INTERNAL_REQUEST_HEADER, usr));
}
};
- client.addListenerFactory(() -> listener);
+// client.addListenerFactory(() -> listener);
}
@Override
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/3d536ed5/solr/core/src/test/org/apache/solr/update/MockingHttp2SolrClient.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/update/MockingHttp2SolrClient.java b/solr/core/src/test/org/apache/solr/update/MockingHttp2SolrClient.java
index c9aded7..6f701be 100644
--- a/solr/core/src/test/org/apache/solr/update/MockingHttp2SolrClient.java
+++ b/solr/core/src/test/org/apache/solr/update/MockingHttp2SolrClient.java
@@ -118,20 +118,22 @@ public class MockingHttp2SolrClient extends Http2SolrClient {
return super.request(request, collection);
}
- public NamedList<Object> request(SolrRequest request, String collection, OnComplete onComplete)
+ public void request(SolrRequest request, String collection, OnComplete onComplete)
throws SolrServerException, IOException {
if (request instanceof UpdateRequest) {
UpdateRequest ur = (UpdateRequest) request;
// won't throw exception if request is DBQ
if (ur.getDeleteQuery() != null && !ur.getDeleteQuery().isEmpty()) {
- return super.request(request, collection, onComplete);
+ super.request(request, collection, onComplete);
+ return;
}
}
if (exp != null) {
if (oneExpPerReq) {
if (reqGotException.contains(request)) {
- return super.request(request, collection, onComplete);
+ super.request(request, collection, onComplete);
+ return;
}
else
reqGotException.add(request);
@@ -151,6 +153,6 @@ public class MockingHttp2SolrClient extends Http2SolrClient {
}
}
- return super.request(request, collection, onComplete);
+ super.request(request, collection, onComplete);
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/3d536ed5/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
index 943e6c2..15923b9 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
@@ -16,6 +16,7 @@
*/
package org.apache.solr.client.solrj.impl;
+import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -27,7 +28,6 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
-import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Set;
@@ -46,6 +46,7 @@ import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.V2RequestSupport;
import org.apache.solr.client.solrj.embedded.SSLConfig;
+import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.request.RequestWriter;
import org.apache.solr.client.solrj.request.V2Request;
import org.apache.solr.common.SolrException;
@@ -59,6 +60,7 @@ import org.apache.solr.common.util.ObjectReleaseTracker;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpClientTransport;
import org.eclipse.jetty.client.ProtocolHandlers;
+import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
@@ -75,7 +77,6 @@ import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http2.client.HTTP2Client;
import org.eclipse.jetty.http2.client.http.HttpClientTransportOverHTTP2;
-import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.util.Fields;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
@@ -84,14 +85,15 @@ import org.slf4j.LoggerFactory;
import static org.apache.solr.common.util.Utils.getObjectByPath;
-// TODO: error handling, small Http2SolrClient features, security, ssl
+// TODO: error handling, small Http2SolrClient features, basic auth, security, ssl, apiV2 ...
/**
* @lucene.experimental
*/
public class Http2SolrClient extends SolrClient {
private static volatile SSLConfig defaultSSLConfig;
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private static final int MAX_OUTSTANDING_REQUESTS = 1000;
private static final String AGENT = "Solr[" + Http2SolrClient.class.getName() + "] 2.0";
private static final String UTF_8 = StandardCharsets.UTF_8.name();
private static final String DEFAULT_PATH = "/select";
@@ -99,12 +101,40 @@ public class Http2SolrClient extends SolrClient {
private HttpClient httpClient;
private volatile Set<String> queryParams = Collections.emptySet();
+ private Phaser phaser = new Phaser(1);
+ private final Semaphore available;
private int idleTimeout;
private ResponseParser parser = new BinaryResponseParser();
private volatile RequestWriter requestWriter = new BinaryRequestWriter();
- private List<HttpListenerFactory> listenerFactory = new LinkedList<>();
- private AsyncTracker asyncTracker = new AsyncTracker();
+ private volatile HttpListenerFactory listenerFactory;
+
+ private Request.QueuedListener requestQueuedListener = new Request.QueuedListener() {
+
+ @Override
+ public void onQueued(Request request) {
+ phaser.register();
+ try {
+ available.acquire();
+ } catch (InterruptedException e) {
+
+ }
+ }
+ };
+
+ private volatile Request.BeginListener beginListener = req -> {
+
+ };
+
+ private Response.CompleteListener requestCompleteListener = new Response.CompleteListener() {
+
+ @Override
+ public void onComplete(Result arg0) {
+ phaser.arriveAndDeregister();
+ available.release();
+ }
+ };
+
/**
* The URL of the Solr server.
*/
@@ -112,6 +142,9 @@ public class Http2SolrClient extends SolrClient {
private boolean closeClient;
protected Http2SolrClient(String serverBaseUrl, Builder builder) {
+ // TODO: what about shared instances?
+ available = new Semaphore(MAX_OUTSTANDING_REQUESTS, false);
+
if (serverBaseUrl != null) {
if (!serverBaseUrl.equals("/") && serverBaseUrl.endsWith("/")) {
serverBaseUrl = serverBaseUrl.substring(0, serverBaseUrl.length() - 1);
@@ -132,6 +165,7 @@ public class Http2SolrClient extends SolrClient {
} else {
httpClient = builder.httpClient;
}
+ if (builder.beginListener != null) setBeginListener(builder.beginListener);
if (!httpClient.isStarted()) {
try {
httpClient.start();
@@ -143,166 +177,187 @@ public class Http2SolrClient extends SolrClient {
assert ObjectReleaseTracker.track(this);
}
- public void addListenerFactory(HttpListenerFactory factory) {
- this.listenerFactory.add(factory);
+ public void setBeginListener(Request.BeginListener beginListener) {
+ this.beginListener = beginListener;
}
- HttpClient getHttpClient() {
- return httpClient;
- }
-
- ProtocolHandlers getProtocolHandlers() {
+ public ProtocolHandlers getProtocolHandlers() {
return httpClient.getProtocolHandlers();
}
private HttpClient createHttpClient(Builder builder) {
+
HttpClient httpClient;
- BlockingArrayQueue<Runnable> queue = new BlockingArrayQueue<>(128, 128);
- QueuedThreadPool httpClientExecutor = new QueuedThreadPool(128, 4, 60000, queue);
+ QueuedThreadPool httpClientExecutor = new QueuedThreadPool(100, 4);
httpClientExecutor.setDaemon(true);
- SslContextFactory sslContextFactory;
- if (builder.sslConfig == null) {
- sslContextFactory = getDefaultSslContextFactory();
- } else {
- sslContextFactory = builder.sslConfig.createContextFactory();
- }
-
HttpClientTransport transport;
- if (builder.useHttp1_1) {
- log.debug("Create Http2SolrClient with HTTP/1.1 transport");
+ if (useHttp1(builder)) {
+ LOG.info("Create Http2SolrClient with HTTP/1.1 transport");
transport = new HttpClientTransportOverHTTP(2);
+
+ SslContextFactory sslContextFactory;
+ if (builder.sslConfig == null) {
+ sslContextFactory = getDefaultSslContextFactory();
+ } else {
+ sslContextFactory = builder.sslConfig.createContextFactory();
+ }
httpClient = new HttpClient(transport, sslContextFactory);
} else {
- log.debug("Create Http2SolrClient with HTTP/2 transport");
+ LOG.info("Create Http2SolrClient with HTTP/2 transport");
+ //TODO adding https support for HTTP2 when use JDK 9
HTTP2Client http2client = new HTTP2Client();
transport = new HttpClientTransportOverHTTP2(http2client);
- httpClient = new HttpClient(transport, sslContextFactory);
+ httpClient = new HttpClient(transport, null);
}
-
httpClient.setExecutor(httpClientExecutor);
httpClient.setStrictEventOrdering(false);
httpClient.setConnectBlocking(true);
httpClient.setFollowRedirects(false);
- httpClient.setMaxRequestsQueuedPerDestination(asyncTracker.getMaxRequestsQueuedPerDestination());
+ httpClient.setMaxConnectionsPerDestination(4);
+ httpClient.setMaxRequestsQueuedPerDestination(MAX_OUTSTANDING_REQUESTS * 4); // comfortably above max outstanding // requests
httpClient.setUserAgentField(new HttpField(HttpHeader.USER_AGENT, AGENT));
-// if (builder.maxConnectionsPerHost != null) httpClient.setMaxConnectionsPerDestination(builder.maxConnectionsPerHost);
- httpClient.setMaxConnectionsPerDestination(4);
if (builder.idleTimeout != null) httpClient.setIdleTimeout(builder.idleTimeout);
if (builder.connectionTimeout != null) httpClient.setConnectTimeout(builder.connectionTimeout);
return httpClient;
}
+ private boolean useHttp1(Builder builder) {
+ if (serverBaseUrl != null && serverBaseUrl.startsWith("https"))
+ return true;
+
+ if (builder.useHttp1_1 || builder.sslConfig != null)
+ return true;
+
+ if (System.getProperty("javax.net.ssl.trustStore") != null)
+ return true;
+
+ return false;
+ }
+
+ public HttpClient getHttpClient() {
+ return httpClient;
+ }
+
public void close() {
// we wait for async requests, so far devs don't want to give sugar for this
- asyncTracker.waitForComplete();
+ phaser.arriveAndAwaitAdvance();
+ phaser.arriveAndDeregister();
if (closeClient) {
- try {
- // TODO: stop time?
- httpClient.setStopTimeout(1000);
- httpClient.stop();
- } catch (Exception e) {
- throw new RuntimeException("Exception on closing client", e);
- }
+ close(httpClient);
}
assert ObjectReleaseTracker.release(this);
}
+ public static void close(HttpClient httpClient) {
+ try {
+ // TODO: stop time?
+ httpClient.setStopTimeout(1000);
+ httpClient.stop();
+
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void request(SolrRequest solrRequest, String collection, OnComplete onComplete)
+ throws SolrServerException, IOException {
+ request(solrRequest, collection, onComplete, false);
+ }
+
private boolean isV2ApiRequest(final SolrRequest request) {
return request instanceof V2Request || request.getPath().contains("/____v2");
}
- public NamedList<Object> request(SolrRequest solrRequest,
+ private Http2ClientResponse request(SolrRequest solrRequest,
String collection,
- OnComplete onComplete) throws IOException, SolrServerException {
+ OnComplete onComplete,
+ boolean returnStream) throws IOException, SolrServerException {
Request req = makeRequest(solrRequest, collection);
setBasicAuthHeader(solrRequest, req);
- for (HttpListenerFactory factory : listenerFactory) {
- HttpListenerFactory.RequestResponseListener listener = factory.get();
- req.onRequestQueued(listener);
+ if (listenerFactory != null) {
+ HttpListenerFactory.RequestResponseListener listener = listenerFactory.get();
req.onRequestBegin(listener);
req.onComplete(listener);
}
- if (onComplete != null) {
- // This async call only suitable for indexing since the response size is limited by 5MB
- req.onRequestQueued(asyncTracker.queuedListener)
- .onComplete(asyncTracker.completeListener).send(new BufferingResponseListener(5 * 1024 * 1024) {
-
- @Override
- public void onComplete(Result result) {
- if (result.isFailed()) {
- onComplete.onFailure(result.getFailure());
- return;
- }
+ if (beginListener != null) {
+ // By calling listener here, we will make sure that SolrRequestInfo can be get from the same thread
+ beginListener.onBegin(req);
+ }
+ try {
+ if (onComplete != null) {
+ req.onRequestQueued(requestQueuedListener)
+ .onComplete(requestCompleteListener).send(new BufferingResponseListener() {
+
+ @Override
+ public void onComplete(Result result) {
+ if (result.isFailed()) {
+ onComplete.onFailure(result.getFailure());
+ return;
+ }
- NamedList<Object> rsp;
- try {
- InputStream is = getContentAsInputStream();
- assert ObjectReleaseTracker.track(is);
- rsp = processErrorsAndResponse(result.getResponse(),
- parser, is, getEncoding(), isV2ApiRequest(solrRequest));
- onComplete.onSuccess(rsp);
- } catch (Exception e) {
- onComplete.onFailure(e);
+ // TODO: should we stream this?
+ try (InputStream ris = getContentAsInputStream()) {
+ NamedList<Object> rsp;
+ try {
+ rsp = processErrorsAndResponse(result.getResponse(),
+ parser, ris, getEncoding(), isV2ApiRequest(solrRequest));
+ onComplete.onSuccess(rsp);
+ } catch (Exception e) {
+ onComplete.onFailure(e);
+ }
+ } catch (IOException e1) {
+ onComplete.onFailure(e1);
+ }
}
+ });
+ return null;
+ } else {
+ Http2ClientResponse arsp = new Http2ClientResponse();
+ if (returnStream) {
+ InputStreamResponseListener listener = new InputStreamResponseListener();
+ req.send(listener);
+ // Wait for the response headers to arrive
+ listener.get(idleTimeout, TimeUnit.SECONDS);
+ // TODO: process response
+ arsp.stream = listener.getInputStream();
+ } else {
+ ContentResponse response = req.send();
+ ByteArrayInputStream is = new ByteArrayInputStream(response.getContent());
+ arsp.response = processErrorsAndResponse(response, parser,
+ is, response.getEncoding(), isV2ApiRequest(solrRequest));
}
- });
- return null;
- } else {
- try {
- InputStreamResponseListener listener = new InputStreamResponseListener();
- req.send(listener);
- Response response = listener.get(idleTimeout, TimeUnit.SECONDS);
- InputStream is = listener.getInputStream();
- assert ObjectReleaseTracker.track(is);
- return processErrorsAndResponse(response, parser, is, getEncoding(response), isV2ApiRequest(solrRequest));
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException(e);
- } catch (TimeoutException e) {
+ return arsp;
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ } catch (TimeoutException e) {
+ throw new SolrServerException(
+ "Timeout occured while waiting response from server at: "
+ + getBaseURL(), e);
+ } catch (ExecutionException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof ConnectException) {
+ throw new SolrServerException("Server refused connection at: "
+ + getBaseURL(), cause);
+ }
+ if (cause instanceof SolrServerException) {
+ throw (SolrServerException) cause;
+ } else if (cause instanceof IOException) {
throw new SolrServerException(
- "Timeout occured while waiting response from server at: "
- + getBaseURL(), e);
- } catch (ExecutionException e) {
- Throwable cause = e.getCause();
- if (cause instanceof ConnectException) {
- throw new SolrServerException("Server refused connection at: "
- + getBaseURL(), cause);
- }
- if (cause instanceof SolrServerException) {
- throw (SolrServerException) cause;
- } else if (cause instanceof IOException) {
- throw new SolrServerException(
- "IOException occured when talking to server at: " + getBaseURL(), cause);
- }
- throw new SolrServerException(cause.getMessage(), cause);
+ "IOException occured when talking to server at: " + getBaseURL(), cause);
}
+ throw new SolrServerException(cause.getMessage(), cause);
}
}
- private String getEncoding(Response response) {
- String contentType = response.getHeaders().get(HttpHeader.CONTENT_TYPE);
- if (contentType != null) {
- String charset = "charset=";
- int index = contentType.toLowerCase(Locale.ENGLISH).indexOf(charset);
- if (index > 0) {
- String encoding = contentType.substring(index + charset.length());
- // Sometimes charsets arrive with an ending semicolon.
- int semicolon = encoding.indexOf(';');
- if (semicolon > 0)
- encoding = encoding.substring(0, semicolon).trim();
- // Sometimes charsets are quoted.
- int lastIndex = encoding.length() - 1;
- if (encoding.charAt(0) == '"' && encoding.charAt(lastIndex) == '"')
- encoding = encoding.substring(1, lastIndex).trim();
- return encoding;
- }
- }
- return null;
+ public void setListenerFactory(HttpListenerFactory listenerFactory) {
+ this.listenerFactory = listenerFactory;
}
private void setBasicAuthHeader(SolrRequest solrRequest, Request req) throws UnsupportedEncodingException {
@@ -454,133 +509,117 @@ public class Http2SolrClient extends SolrClient {
return req;
}
- private boolean wantStream(final ResponseParser processor) {
- return processor == null || processor instanceof InputStreamResponseParser;
- }
-
private NamedList<Object> processErrorsAndResponse(Response response,
final ResponseParser processor,
InputStream is,
String encoding,
final boolean isV2Api)
throws SolrServerException {
- boolean shouldClose = true;
- try {
- // handle some http level checks before trying to parse the response
- int httpStatus = response.getStatus();
-
- String contentType;
- contentType = response.getHeaders().get("content-type");
- if (contentType == null) contentType = "";
-
- switch (httpStatus) {
- case HttpStatus.SC_OK:
- case HttpStatus.SC_BAD_REQUEST:
- case HttpStatus.SC_CONFLICT: // 409
- break;
- case HttpStatus.SC_MOVED_PERMANENTLY:
- case HttpStatus.SC_MOVED_TEMPORARILY:
- if (!httpClient.isFollowRedirects()) {
- throw new SolrServerException("Server at " + getBaseURL()
- + " sent back a redirect (" + httpStatus + ").");
- }
- break;
- default:
- if (processor == null || "".equals(contentType)) {
- throw new RemoteSolrException(serverBaseUrl, httpStatus, "non ok status: " + httpStatus
- + ", message:" + response.getReason(),
- null);
- }
- }
-
- if (wantStream(parser)) {
- // no processor specified, return raw stream
- NamedList<Object> rsp = new NamedList<>();
- rsp.add("stream", is);
- // Only case where stream should not be closed
- shouldClose = false;
- return rsp;
- }
+ // handle some http level checks before trying to parse the response
+ int httpStatus = response.getStatus();
+
+ String contentType;
+ contentType = response.getHeaders().get("content-type");
+ if (contentType == null) contentType = "";
+
+ switch (httpStatus) {
+ case HttpStatus.SC_OK:
+ case HttpStatus.SC_BAD_REQUEST:
+ case HttpStatus.SC_CONFLICT: // 409
+ break;
+ case HttpStatus.SC_MOVED_PERMANENTLY:
+ case HttpStatus.SC_MOVED_TEMPORARILY:
+ if (!httpClient.isFollowRedirects()) {
+ throw new SolrServerException("Server at " + getBaseURL()
+ + " sent back a redirect (" + httpStatus + ").");
+ }
+ break;
+ default:
+ if (processor == null || "".equals(contentType)) {
+ throw new RemoteSolrException(serverBaseUrl, httpStatus, "non ok status: " + httpStatus
+ + ", message:" + response.getReason(),
+ null);
+ }
+ }
- String procCt = processor.getContentType();
- if (procCt != null) {
- String procMimeType = ContentType.parse(procCt).getMimeType().trim().toLowerCase(Locale.ROOT);
- String mimeType = ContentType.parse(contentType).getMimeType().trim().toLowerCase(Locale.ROOT);
- if (!procMimeType.equals(mimeType)) {
- // unexpected mime type
- String msg = "Expected mime type " + procMimeType + " but got " + mimeType + ".";
- try {
- msg = msg + " " + IOUtils.toString(is, encoding);
- } catch (IOException e) {
- throw new RemoteSolrException(serverBaseUrl, httpStatus, "Could not parse response with encoding " + encoding, e);
- }
- throw new RemoteSolrException(serverBaseUrl, httpStatus, msg, null);
+ String procCt = processor.getContentType();
+ if (procCt != null) {
+ String procMimeType = ContentType.parse(procCt).getMimeType().trim().toLowerCase(Locale.ROOT);
+ String mimeType = ContentType.parse(contentType).getMimeType().trim().toLowerCase(Locale.ROOT);
+ if (!procMimeType.equals(mimeType)) {
+ // unexpected mime type
+ String msg = "Expected mime type " + procMimeType + " but got " + mimeType + ".";
+ try {
+ msg = msg + " " + IOUtils.toString(is, encoding);
+ } catch (IOException e) {
+ throw new RemoteSolrException(serverBaseUrl, httpStatus, "Could not parse response with encoding " + encoding, e);
}
+ throw new RemoteSolrException(serverBaseUrl, httpStatus, msg, null);
}
+ }
- NamedList<Object> rsp;
- try {
- rsp = parser.processResponse(is, encoding);
- } catch (Exception e) {
- throw new RemoteSolrException(serverBaseUrl, httpStatus, e.getMessage(), e);
- }
+ NamedList<Object> rsp;
+ try {
+ rsp = parser.processResponse(is, encoding);
+ } catch (Exception e) {
+ throw new RemoteSolrException(serverBaseUrl, httpStatus, e.getMessage(), e);
+ }
- Object error = rsp == null ? null : rsp.get("error");
- if (error != null && (String.valueOf(getObjectByPath(error, true, errPath)).endsWith("ExceptionWithErrObject"))) {
- throw RemoteExecutionException.create(serverBaseUrl, rsp);
- }
- if (httpStatus != HttpStatus.SC_OK && !isV2Api) {
- NamedList<String> metadata = null;
- String reason = null;
- try {
- NamedList err = (NamedList) rsp.get("error");
- if (err != null) {
- reason = (String) err.get("msg");
- if (reason == null) {
- reason = (String) err.get("trace");
- }
- metadata = (NamedList<String>) err.get("metadata");
- }
- } catch (Exception ex) {}
- if (reason == null) {
- StringBuilder msg = new StringBuilder();
- msg.append(response.getReason())
- .append("\n\n")
- .append("request: ")
- .append(response.getRequest().getMethod());
- try {
- reason = java.net.URLDecoder.decode(msg.toString(), UTF_8);
- } catch (UnsupportedEncodingException e) {
+ Object error = rsp == null ? null : rsp.get("error");
+ if (error != null && (String.valueOf(getObjectByPath(error, true, errPath)).endsWith("ExceptionWithErrObject"))) {
+ throw RemoteExecutionException.create(serverBaseUrl, rsp);
+ }
+ if (httpStatus != HttpStatus.SC_OK && !isV2Api) {
+ NamedList<String> metadata = null;
+ String reason = null;
+ try {
+ NamedList err = (NamedList) rsp.get("error");
+ if (err != null) {
+ reason = (String) err.get("msg");
+ if (reason == null) {
+ reason = (String) err.get("trace");
}
+ metadata = (NamedList<String>) err.get("metadata");
}
- RemoteSolrException rss = new RemoteSolrException(serverBaseUrl, httpStatus, reason, null);
- if (metadata != null) rss.setMetadata(metadata);
- throw rss;
+ } catch (Exception ex) {
}
- return rsp;
- } finally {
- if (shouldClose) {
+ if (reason == null) {
+ StringBuilder msg = new StringBuilder();
+ msg.append(response.getReason())
+ .append("\n\n")
+ .append("request: ")
+ .append(response.getRequest().getMethod());
try {
- is.close();
- assert ObjectReleaseTracker.release(is);
- } catch (IOException e) {
- // quitely
+ reason = java.net.URLDecoder.decode(msg.toString(), UTF_8);
+ } catch (UnsupportedEncodingException e) {
}
}
+ RemoteSolrException rss = new RemoteSolrException(serverBaseUrl, httpStatus, reason, null);
+ if (metadata != null) rss.setMetadata(metadata);
+ throw rss;
}
+ return rsp;
}
@Override
public NamedList<Object> request(SolrRequest request, String collection) throws SolrServerException, IOException {
- return request(request, collection, null);
+ return request(request, collection, null, false).response;
}
public void setRequestWriter(RequestWriter requestWriter) {
this.requestWriter = requestWriter;
}
+ private InputStream queryAndStreamResponse(String collection, SolrParams params)
+ throws SolrServerException, IOException {
+ QueryRequest queryRequest = new QueryRequest(params);
+ Http2ClientResponse resp = request(queryRequest, collection, null, true);
+ assert resp.stream != null;
+ return resp.stream;
+ }
+
public interface OnComplete {
- void onSuccess(NamedList<Object> result);
+ void onSuccess(NamedList result);
void onFailure(Throwable e);
}
@@ -593,54 +632,15 @@ public class Http2SolrClient extends SolrClient {
return serverBaseUrl;
}
- private static class AsyncTracker {
- private static final int MAX_OUTSTANDING_REQUESTS = 1000;
-
- // wait for async requests
- private final Phaser phaser;
- // maximum outstanding requests left
- private final Semaphore available;
- private final Request.QueuedListener queuedListener;
- private final Response.CompleteListener completeListener;
-
- AsyncTracker() {
- // TODO: what about shared instances?
- phaser = new Phaser(1);
- available = new Semaphore(MAX_OUTSTANDING_REQUESTS, false);
- queuedListener = request -> {
- phaser.register();
- try {
- available.acquire();
- } catch (InterruptedException ignored) {
-
- }
- };
- completeListener = result -> {
- phaser.arriveAndDeregister();
- available.release();
- };
- }
-
- int getMaxRequestsQueuedPerDestination() {
- // comfortably above max outstanding requests
- return MAX_OUTSTANDING_REQUESTS * 3;
- }
-
- public void waitForComplete() {
- phaser.arriveAndAwaitAdvance();
- phaser.arriveAndDeregister();
- }
- }
-
public static class Builder {
private HttpClient httpClient;
private SSLConfig sslConfig = defaultSSLConfig;
private Integer idleTimeout;
private Integer connectionTimeout;
- private Integer maxConnectionsPerHost;
private boolean useHttp1_1 = Boolean.getBoolean("solr.http1");
protected String baseSolrUrl;
+ private Request.BeginListener beginListener = request -> {};
public Builder() {
@@ -664,11 +664,6 @@ public class Http2SolrClient extends SolrClient {
return this;
}
- public Builder maxConnectionsPerHost(int max) {
- this.maxConnectionsPerHost = max;
- return this;
- }
-
public Builder idleTimeout(int idleConnectionTimeout) {
this.idleTimeout = idleConnectionTimeout;
return this;
@@ -684,6 +679,10 @@ public class Http2SolrClient extends SolrClient {
return this;
}
+ public Builder withListener(Request.BeginListener beginListener) {
+ this.beginListener = beginListener;
+ return this;
+ }
}
/**
@@ -732,6 +731,11 @@ public class Http2SolrClient extends SolrClient {
}
}
+ protected static class Http2ClientResponse {
+ NamedList response;
+ InputStream stream;
+ }
+
public Set<String> getQueryParams() {
return queryParams;
}
@@ -816,4 +820,4 @@ public class Http2SolrClient extends SolrClient {
return sslContextFactory;
}
-}
+}
\ No newline at end of file