You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by da...@apache.org on 2018/10/25 07:31:26 UTC
lucene-solr:jira/http2: Http2SolrClient support async as default
Repository: lucene-solr
Updated Branches:
refs/heads/jira/http2 814bf425e -> e3ee220ab
Http2SolrClient support async as default
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/e3ee220a
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/e3ee220a
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/e3ee220a
Branch: refs/heads/jira/http2
Commit: e3ee220abe23e513752bbb76e35c075536f62d3c
Parents: 814bf42
Author: Cao Manh Dat <da...@apache.org>
Authored: Thu Oct 25 14:31:18 2018 +0700
Committer: Cao Manh Dat <da...@apache.org>
Committed: Thu Oct 25 14:31:18 2018 +0700
----------------------------------------------------------------------
.../solr/client/solrj/impl/Http2SolrClient.java | 173 +++++++------------
1 file changed, 67 insertions(+), 106 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e3ee220a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
index 8d73021..54852d5 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
@@ -32,11 +32,13 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Phaser;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.io.IOUtils;
import org.apache.http.HttpStatus;
@@ -212,19 +214,13 @@ public class Http2SolrClient extends SolrClient {
assert ObjectReleaseTracker.release(this);
}
- public void request(SolrRequest solrRequest, String collection, OnComplete onComplete)
- throws SolrServerException, IOException {
- request(solrRequest, collection, onComplete, false);
- }
-
private boolean isV2ApiRequest(final SolrRequest request) {
return request instanceof V2Request || request.getPath().contains("/____v2");
}
- private Http2ClientResponse request(SolrRequest solrRequest,
+ public void request(SolrRequest solrRequest,
String collection,
- OnComplete onComplete,
- boolean returnStream) throws IOException, SolrServerException {
+ OnComplete onComplete) throws IOException, SolrServerException {
Request req = makeRequest(solrRequest, collection);
setBasicAuthHeader(solrRequest, req);
for (HttpListenerFactory factory : listenerFactory) {
@@ -234,96 +230,32 @@ public class Http2SolrClient extends SolrClient {
req.onComplete(listener);
}
- try {
- if (onComplete != null) {
- req.onRequestQueued(asyncTracker.queuedListener)
- .onComplete(asyncTracker.completeListener).send(new BufferingResponseListener() {
-
- @Override
- public void onComplete(Result result) {
- if (result.isFailed()) {
- onComplete.onFailure(result.getFailure());
- return;
- }
+ req.onRequestQueued(asyncTracker.queuedListener)
+ // maximum 6MB for buffering response
+ .onComplete(asyncTracker.completeListener).send(new BufferingResponseListener(6*1024*1024) {
- // TODO: should we stream this?
- try (InputStream ris = getContentAsInputStream()) {
- NamedList<Object> rsp;
- try {
- rsp = processErrorsAndResponse(result.getResponse(),
- parser, ris, getEncoding(), isV2ApiRequest(solrRequest));
- onComplete.onSuccess(rsp);
- } catch (Exception e) {
- onComplete.onFailure(e);
- }
- } catch (IOException e1) {
+ @Override
+ public void onComplete(Result result) {
+ if (result.isFailed()) {
+ if (onComplete != null) {
+ onComplete.onFailure(result.getFailure());
+ }
+ } else {
+ try (InputStream ris = getContentAsInputStream()) {
+ NamedList<Object> rsp;
+ rsp = processErrorsAndResponse(result.getResponse(),
+ parser, ris, getEncoding(), isV2ApiRequest(solrRequest));
+ if (onComplete != null)
+ onComplete.onSuccess(rsp);
+ } catch (Exception e1) {
+ if (onComplete != null) {
onComplete.onFailure(e1);
}
}
- });
- return null;
- } else {
- Http2ClientResponse arsp = new Http2ClientResponse();
- if (returnStream) {
- InputStreamResponseListener listener = new InputStreamResponseListener();
- req.send(listener);
- // Wait for the response headers to arrive
- listener.get(idleTimeout, TimeUnit.SECONDS);
- // TODO: process response
- arsp.stream = listener.getInputStream();
- } else {
- ContentResponse response = senReqSync(req);
- ByteArrayInputStream is = new ByteArrayInputStream(response.getContent());
- arsp.response = processErrorsAndResponse(response, parser,
- is, response.getEncoding(), isV2ApiRequest(solrRequest));
}
- return arsp;
}
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException(e);
- } catch (TimeoutException e) {
- throw new SolrServerException(
- "Timeout occured while waiting response from server at: "
- + getBaseURL(), e);
- } catch (ExecutionException e) {
- Throwable cause = e.getCause();
- if (cause instanceof ConnectException) {
- throw new SolrServerException("Server refused connection at: "
- + getBaseURL(), cause);
- }
- if (cause instanceof SolrServerException) {
- throw (SolrServerException) cause;
- } else if (cause instanceof IOException) {
- throw new SolrServerException(
- "IOException occured when talking to server at: " + getBaseURL(), cause);
- }
- throw new SolrServerException(cause.getMessage(), cause);
- }
- }
+ });
- private ContentResponse senReqSync(Request req) throws InterruptedException, TimeoutException, ExecutionException {
- // req.send() method will throw exception when response is more than 2MB,
- // by passing a responseListener we can overcome the problem, default buffer size is 20MB
- FutureResponseListener listener = new FutureResponseListener(req, 20*1024*1024);
- req.send(listener);
- try {
- return listener.get();
- } catch (ExecutionException x) {
- // the exception handling is copied from HttpRequest.send()
- if (x.getCause() instanceof TimeoutException)
- {
- TimeoutException t = (TimeoutException) (x.getCause());
- req.abort(t);
- throw t;
- }
-
- req.abort(x);
- throw x;
- } catch (Throwable x) {
- req.abort(x);
- throw x;
- }
}
private void setBasicAuthHeader(SolrRequest solrRequest, Request req) throws UnsupportedEncodingException {
@@ -567,25 +499,59 @@ public class Http2SolrClient extends SolrClient {
return rsp;
}
+ private static class SyncOnComplete implements OnComplete {
+
+ CountDownLatch latch = new CountDownLatch(1);
+ NamedList<Object> result;
+ Throwable t;
+
+
+ @Override
+ public void onSuccess(NamedList<Object> result) {
+ this.result = result;
+ latch.countDown();
+ }
+
+ @Override
+ public void onFailure(Throwable e) {
+ this.t = e;
+ latch.countDown();
+ }
+
+ void waitForResult() throws InterruptedException {
+ latch.await();
+ }
+ }
+
@Override
public NamedList<Object> request(SolrRequest request, String collection) throws SolrServerException, IOException {
- return request(request, collection, null, false).response;
+ SyncOnComplete syncOnComplete = new SyncOnComplete();
+ request(request, collection, syncOnComplete);
+ try {
+ syncOnComplete.waitForResult();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ if (syncOnComplete.result != null) {
+ return syncOnComplete.result;
+ }
+ if (syncOnComplete.t != null) {
+ if (syncOnComplete.t instanceof IOException) {
+ throw new SolrServerException(
+ "IOException occured when talking to server at: " + getBaseURL(), syncOnComplete.t);
+ }
+ throw new SolrServerException(syncOnComplete.t.getMessage(), syncOnComplete.t);
+ }
+ throw new IllegalStateException("Request did not return an exception or result");
}
public void setRequestWriter(RequestWriter requestWriter) {
this.requestWriter = requestWriter;
}
- private InputStream queryAndStreamResponse(String collection, SolrParams params)
- throws SolrServerException, IOException {
- QueryRequest queryRequest = new QueryRequest(params);
- Http2ClientResponse resp = request(queryRequest, collection, null, true);
- assert resp.stream != null;
- return resp.stream;
- }
-
public interface OnComplete {
- void onSuccess(NamedList result);
+ void onSuccess(NamedList<Object> result);
void onFailure(Throwable e);
}
@@ -737,11 +703,6 @@ public class Http2SolrClient extends SolrClient {
}
}
- protected static class Http2ClientResponse {
- NamedList response;
- InputStream stream;
- }
-
public Set<String> getQueryParams() {
return queryParams;
}