You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by da...@apache.org on 2018/10/25 07:31:26 UTC

lucene-solr:jira/http2: Http2SolrClient support async as default

Repository: lucene-solr
Updated Branches:
  refs/heads/jira/http2 814bf425e -> e3ee220ab


Http2SolrClient support async as default


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/e3ee220a
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/e3ee220a
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/e3ee220a

Branch: refs/heads/jira/http2
Commit: e3ee220abe23e513752bbb76e35c075536f62d3c
Parents: 814bf42
Author: Cao Manh Dat <da...@apache.org>
Authored: Thu Oct 25 14:31:18 2018 +0700
Committer: Cao Manh Dat <da...@apache.org>
Committed: Thu Oct 25 14:31:18 2018 +0700

----------------------------------------------------------------------
 .../solr/client/solrj/impl/Http2SolrClient.java | 173 +++++++------------
 1 file changed, 67 insertions(+), 106 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/e3ee220a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
index 8d73021..54852d5 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
@@ -32,11 +32,13 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Locale;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Phaser;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.http.HttpStatus;
@@ -212,19 +214,13 @@ public class Http2SolrClient extends SolrClient {
     assert ObjectReleaseTracker.release(this);
   }
 
-  public void request(SolrRequest solrRequest, String collection, OnComplete onComplete)
-      throws SolrServerException, IOException {
-    request(solrRequest, collection, onComplete, false);
-  }
-
   private boolean isV2ApiRequest(final SolrRequest request) {
     return request instanceof V2Request || request.getPath().contains("/____v2");
   }
 
-  private Http2ClientResponse request(SolrRequest solrRequest,
+  public void request(SolrRequest solrRequest,
                                       String collection,
-                                      OnComplete onComplete,
-                                      boolean returnStream) throws IOException, SolrServerException {
+                                      OnComplete onComplete) throws IOException, SolrServerException {
     Request req = makeRequest(solrRequest, collection);
     setBasicAuthHeader(solrRequest, req);
     for (HttpListenerFactory factory : listenerFactory) {
@@ -234,96 +230,32 @@ public class Http2SolrClient extends SolrClient {
       req.onComplete(listener);
     }
 
-    try {
-      if (onComplete != null) {
-        req.onRequestQueued(asyncTracker.queuedListener)
-            .onComplete(asyncTracker.completeListener).send(new BufferingResponseListener() {
-
-          @Override
-          public void onComplete(Result result) {
-            if (result.isFailed()) {
-              onComplete.onFailure(result.getFailure());
-              return;
-            }
+    req.onRequestQueued(asyncTracker.queuedListener)
+        // maximum 6MB for buffering response
+        .onComplete(asyncTracker.completeListener).send(new BufferingResponseListener(6*1024*1024) {
 
-            // TODO: should we stream this?
-            try (InputStream ris = getContentAsInputStream()) {
-              NamedList<Object> rsp;
-              try {
-                rsp = processErrorsAndResponse(result.getResponse(),
-                    parser, ris, getEncoding(), isV2ApiRequest(solrRequest));
-                onComplete.onSuccess(rsp);
-              } catch (Exception e) {
-                onComplete.onFailure(e);
-              }
-            } catch (IOException e1) {
+      @Override
+      public void onComplete(Result result) {
+        if (result.isFailed()) {
+          if (onComplete != null) {
+            onComplete.onFailure(result.getFailure());
+          }
+        } else {
+          try (InputStream ris = getContentAsInputStream()) {
+            NamedList<Object> rsp;
+            rsp = processErrorsAndResponse(result.getResponse(),
+                parser, ris, getEncoding(), isV2ApiRequest(solrRequest));
+            if (onComplete != null)
+              onComplete.onSuccess(rsp);
+          } catch (Exception e1) {
+            if (onComplete != null) {
               onComplete.onFailure(e1);
             }
           }
-        });
-        return null;
-      } else {
-        Http2ClientResponse arsp = new Http2ClientResponse();
-        if (returnStream) {
-          InputStreamResponseListener listener = new InputStreamResponseListener();
-          req.send(listener);
-          // Wait for the response headers to arrive
-          listener.get(idleTimeout, TimeUnit.SECONDS);
-          // TODO: process response
-          arsp.stream = listener.getInputStream();
-        } else {
-          ContentResponse response = senReqSync(req);
-          ByteArrayInputStream is = new ByteArrayInputStream(response.getContent());
-          arsp.response = processErrorsAndResponse(response, parser,
-              is, response.getEncoding(), isV2ApiRequest(solrRequest));
         }
-        return arsp;
       }
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw new RuntimeException(e);
-    } catch (TimeoutException e) {
-      throw new SolrServerException(
-          "Timeout occured while waiting response from server at: "
-              + getBaseURL(), e);
-    } catch (ExecutionException e) {
-      Throwable cause = e.getCause();
-      if (cause instanceof ConnectException) {
-        throw new SolrServerException("Server refused connection at: "
-            + getBaseURL(), cause);
-      }
-      if (cause instanceof SolrServerException) {
-        throw (SolrServerException) cause;
-      } else if (cause instanceof IOException) {
-        throw new SolrServerException(
-            "IOException occured when talking to server at: " + getBaseURL(), cause);
-      }
-      throw new SolrServerException(cause.getMessage(), cause);
-    }
-  }
+    });
 
-  private ContentResponse senReqSync(Request req) throws InterruptedException, TimeoutException, ExecutionException {
-    // req.send() method will throw exception when response is more than 2MB,
-    // by passing a responseListener we can overcome the problem, default buffer size is 20MB
-    FutureResponseListener listener = new FutureResponseListener(req, 20*1024*1024);
-    req.send(listener);
-    try {
-      return listener.get();
-    } catch (ExecutionException x) {
-      // the exception handling is copied from HttpRequest.send()
-      if (x.getCause() instanceof TimeoutException)
-      {
-        TimeoutException t = (TimeoutException) (x.getCause());
-        req.abort(t);
-        throw t;
-      }
-
-      req.abort(x);
-      throw x;
-    } catch (Throwable x) {
-      req.abort(x);
-      throw x;
-    }
   }
 
   private void setBasicAuthHeader(SolrRequest solrRequest, Request req) throws UnsupportedEncodingException {
@@ -567,25 +499,59 @@ public class Http2SolrClient extends SolrClient {
     return rsp;
   }
 
+  private static class SyncOnComplete implements OnComplete {
+
+    CountDownLatch latch = new CountDownLatch(1);
+    NamedList<Object> result;
+    Throwable t;
+
+
+    @Override
+    public void onSuccess(NamedList<Object> result) {
+      this.result = result;
+      latch.countDown();
+    }
+
+    @Override
+    public void onFailure(Throwable e) {
+      this.t = e;
+      latch.countDown();
+    }
+
+    void waitForResult() throws InterruptedException {
+      latch.await();
+    }
+  }
+
   @Override
   public NamedList<Object> request(SolrRequest request, String collection) throws SolrServerException, IOException {
-    return request(request, collection, null, false).response;
+    SyncOnComplete syncOnComplete = new SyncOnComplete();
+    request(request, collection, syncOnComplete);
+    try {
+      syncOnComplete.waitForResult();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException(e);
+    }
+    if (syncOnComplete.result != null) {
+      return syncOnComplete.result;
+    }
+    if (syncOnComplete.t != null) {
+      if (syncOnComplete.t instanceof IOException) {
+        throw new SolrServerException(
+            "IOException occured when talking to server at: " + getBaseURL(), syncOnComplete.t);
+      }
+      throw new SolrServerException(syncOnComplete.t.getMessage(), syncOnComplete.t);
+    }
+    throw new IllegalStateException("Request did not return an exception or result");
   }
 
   public void setRequestWriter(RequestWriter requestWriter) {
     this.requestWriter = requestWriter;
   }
 
-  private InputStream queryAndStreamResponse(String collection, SolrParams params)
-      throws SolrServerException, IOException {
-    QueryRequest queryRequest = new QueryRequest(params);
-    Http2ClientResponse resp = request(queryRequest, collection, null, true);
-    assert resp.stream != null;
-    return resp.stream;
-  }
-
   public interface OnComplete {
-    void onSuccess(NamedList result);
+    void onSuccess(NamedList<Object> result);
 
     void onFailure(Throwable e);
   }
@@ -737,11 +703,6 @@ public class Http2SolrClient extends SolrClient {
     }
   }
 
-  protected static class Http2ClientResponse {
-    NamedList response;
-    InputStream stream;
-  }
-
   public Set<String> getQueryParams() {
     return queryParams;
   }