You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by da...@apache.org on 2018/10/30 09:14:03 UTC

lucene-solr:jira/http2: Temporary change Http2SolrClient for benchmark

Repository: lucene-solr
Updated Branches:
  refs/heads/jira/http2 4943e412e -> 3d536ed54


Temporary change Http2SolrClient for benchmark


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/3d536ed5
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/3d536ed5
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/3d536ed5

Branch: refs/heads/jira/http2
Commit: 3d536ed5492b8f74efe8d4bc1cb5e00169939fdb
Parents: 4943e41
Author: Cao Manh Dat <da...@apache.org>
Authored: Tue Oct 30 09:13:51 2018 +0000
Committer: Cao Manh Dat <da...@apache.org>
Committed: Tue Oct 30 09:13:51 2018 +0000

----------------------------------------------------------------------
 .../component/HttpShardHandlerFactory.java      |   4 +-
 .../solr/security/PKIAuthenticationPlugin.java  |   2 +-
 .../apache/solr/update/UpdateShardHandler.java  |   5 +-
 .../HttpParamDelegationTokenPlugin.java         |   2 +-
 .../solr/update/MockingHttp2SolrClient.java     |  10 +-
 .../solr/client/solrj/impl/Http2SolrClient.java | 514 ++++++++++---------
 6 files changed, 270 insertions(+), 267 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/3d536ed5/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java
index c5207e6..fcd705b 100644
--- a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java
+++ b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java
@@ -205,9 +205,7 @@ public class HttpShardHandlerFactory extends ShardHandlerFactory implements org.
 
     this.defaultClient = new Http2SolrClient.Builder()
         .connectionTimeout(connectionTimeout)
-        .idleTimeout(soTimeout)
-        .maxConnectionsPerHost(maxConnectionsPerHost).build();
-    this.defaultClient.addListenerFactory(this.httpListenerFactory);
+        .idleTimeout(soTimeout).build();
     this.loadbalancer = new LBHttp2SolrClient(defaultClient);
   }
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/3d536ed5/solr/core/src/java/org/apache/solr/security/PKIAuthenticationPlugin.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/security/PKIAuthenticationPlugin.java b/solr/core/src/java/org/apache/solr/security/PKIAuthenticationPlugin.java
index 6f85d95..a311fc3 100644
--- a/solr/core/src/java/org/apache/solr/security/PKIAuthenticationPlugin.java
+++ b/solr/core/src/java/org/apache/solr/security/PKIAuthenticationPlugin.java
@@ -232,7 +232,7 @@ public class PKIAuthenticationPlugin extends AuthenticationPlugin implements Htt
         generateToken().ifPresent(s -> request.header(HEADER, myNodeName + " " + s));
       }
     };
-    client.addListenerFactory(() -> listener);
+//    client.addListenerFactory(() -> listener);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/3d536ed5/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java b/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java
index 4af1ae7..f3f072d 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java
@@ -110,11 +110,10 @@ public class UpdateShardHandler implements SolrMetricProducer, SolrInfoBean {
     if (cfg != null) {
       updateOnlyClientBuilder
           .connectionTimeout(cfg.getDistributedConnectionTimeout())
-          .idleTimeout(cfg.getDistributedSocketTimeout())
-          .maxConnectionsPerHost(cfg.getMaxUpdateConnectionsPerHost());
+          .idleTimeout(cfg.getDistributedSocketTimeout());
     }
     updateOnlyClient = updateOnlyClientBuilder.build();
-    updateOnlyClient.addListenerFactory(updateHttpListenerFactory);
+//    updateOnlyClient.addListenerFactory(updateHttpListenerFactory);
 
     defaultClient = HttpClientUtil.createClient(clientParams, defaultConnectionManager, false, httpRequestExecutor);
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/3d536ed5/solr/core/src/test/org/apache/solr/security/HttpParamDelegationTokenPlugin.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/security/HttpParamDelegationTokenPlugin.java b/solr/core/src/test/org/apache/solr/security/HttpParamDelegationTokenPlugin.java
index a8f0355..f0bc23f 100644
--- a/solr/core/src/test/org/apache/solr/security/HttpParamDelegationTokenPlugin.java
+++ b/solr/core/src/test/org/apache/solr/security/HttpParamDelegationTokenPlugin.java
@@ -155,7 +155,7 @@ public class HttpParamDelegationTokenPlugin extends KerberosPlugin {
         getPrincipal().ifPresent(usr -> request.header(INTERNAL_REQUEST_HEADER, usr));
       }
     };
-    client.addListenerFactory(() -> listener);
+//    client.addListenerFactory(() -> listener);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/3d536ed5/solr/core/src/test/org/apache/solr/update/MockingHttp2SolrClient.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/update/MockingHttp2SolrClient.java b/solr/core/src/test/org/apache/solr/update/MockingHttp2SolrClient.java
index c9aded7..6f701be 100644
--- a/solr/core/src/test/org/apache/solr/update/MockingHttp2SolrClient.java
+++ b/solr/core/src/test/org/apache/solr/update/MockingHttp2SolrClient.java
@@ -118,20 +118,22 @@ public class MockingHttp2SolrClient extends Http2SolrClient {
     return super.request(request, collection);
   }
 
-  public NamedList<Object> request(SolrRequest request, String collection, OnComplete onComplete)
+  public void request(SolrRequest request, String collection, OnComplete onComplete)
       throws SolrServerException, IOException {
     if (request instanceof UpdateRequest) {
       UpdateRequest ur = (UpdateRequest) request;
       // won't throw exception if request is DBQ
       if (ur.getDeleteQuery() != null && !ur.getDeleteQuery().isEmpty()) {
-        return super.request(request, collection, onComplete);
+        super.request(request, collection, onComplete);
+        return;
       }
     }
 
     if (exp != null) {
       if (oneExpPerReq) {
         if (reqGotException.contains(request)) {
-          return super.request(request, collection, onComplete);
+          super.request(request, collection, onComplete);
+          return;
         }
         else
           reqGotException.add(request);
@@ -151,6 +153,6 @@ public class MockingHttp2SolrClient extends Http2SolrClient {
       }
     }
 
-    return super.request(request, collection, onComplete);
+    super.request(request, collection, onComplete);
   }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/3d536ed5/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
index 943e6c2..15923b9 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
@@ -16,6 +16,7 @@
  */
 package org.apache.solr.client.solrj.impl;
 
+import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -27,7 +28,6 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Locale;
 import java.util.Set;
@@ -46,6 +46,7 @@ import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.V2RequestSupport;
 import org.apache.solr.client.solrj.embedded.SSLConfig;
+import org.apache.solr.client.solrj.request.QueryRequest;
 import org.apache.solr.client.solrj.request.RequestWriter;
 import org.apache.solr.client.solrj.request.V2Request;
 import org.apache.solr.common.SolrException;
@@ -59,6 +60,7 @@ import org.apache.solr.common.util.ObjectReleaseTracker;
 import org.eclipse.jetty.client.HttpClient;
 import org.eclipse.jetty.client.HttpClientTransport;
 import org.eclipse.jetty.client.ProtocolHandlers;
+import org.eclipse.jetty.client.api.ContentResponse;
 import org.eclipse.jetty.client.api.Request;
 import org.eclipse.jetty.client.api.Response;
 import org.eclipse.jetty.client.api.Result;
@@ -75,7 +77,6 @@ import org.eclipse.jetty.http.HttpHeader;
 import org.eclipse.jetty.http.HttpMethod;
 import org.eclipse.jetty.http2.client.HTTP2Client;
 import org.eclipse.jetty.http2.client.http.HttpClientTransportOverHTTP2;
-import org.eclipse.jetty.util.BlockingArrayQueue;
 import org.eclipse.jetty.util.Fields;
 import org.eclipse.jetty.util.ssl.SslContextFactory;
 import org.eclipse.jetty.util.thread.QueuedThreadPool;
@@ -84,14 +85,15 @@ import org.slf4j.LoggerFactory;
 
 import static org.apache.solr.common.util.Utils.getObjectByPath;
 
-// TODO: error handling, small Http2SolrClient features, security, ssl
+// TODO: error handling, small Http2SolrClient features, basic auth, security, ssl, apiV2 ...
 /**
  * @lucene.experimental
  */
 public class Http2SolrClient extends SolrClient {
   private static volatile SSLConfig defaultSSLConfig;
 
-  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private static final int MAX_OUTSTANDING_REQUESTS = 1000;
   private static final String AGENT = "Solr[" + Http2SolrClient.class.getName() + "] 2.0";
   private static final String UTF_8 = StandardCharsets.UTF_8.name();
   private static final String DEFAULT_PATH = "/select";
@@ -99,12 +101,40 @@ public class Http2SolrClient extends SolrClient {
 
   private HttpClient httpClient;
   private volatile Set<String> queryParams = Collections.emptySet();
+  private Phaser phaser = new Phaser(1);
+  private final Semaphore available;
   private int idleTimeout;
 
   private ResponseParser parser = new BinaryResponseParser();
   private volatile RequestWriter requestWriter = new BinaryRequestWriter();
-  private List<HttpListenerFactory> listenerFactory = new LinkedList<>();
-  private AsyncTracker asyncTracker = new AsyncTracker();
+  private volatile HttpListenerFactory listenerFactory;
+
+  private Request.QueuedListener requestQueuedListener = new Request.QueuedListener() {
+
+    @Override
+    public void onQueued(Request request) {
+      phaser.register();
+      try {
+        available.acquire();
+      } catch (InterruptedException e) {
+
+      }
+    }
+  };
+
+  private volatile Request.BeginListener beginListener = req -> {
+
+  };
+
+  private Response.CompleteListener requestCompleteListener = new Response.CompleteListener() {
+
+    @Override
+    public void onComplete(Result arg0) {
+      phaser.arriveAndDeregister();
+      available.release();
+    }
+  };
+
   /**
    * The URL of the Solr server.
    */
@@ -112,6 +142,9 @@ public class Http2SolrClient extends SolrClient {
   private boolean closeClient;
 
   protected Http2SolrClient(String serverBaseUrl, Builder builder) {
+    // TODO: what about shared instances?
+    available = new Semaphore(MAX_OUTSTANDING_REQUESTS, false);
+
     if (serverBaseUrl != null)  {
       if (!serverBaseUrl.equals("/") && serverBaseUrl.endsWith("/")) {
         serverBaseUrl = serverBaseUrl.substring(0, serverBaseUrl.length() - 1);
@@ -132,6 +165,7 @@ public class Http2SolrClient extends SolrClient {
     } else {
       httpClient = builder.httpClient;
     }
+    if (builder.beginListener != null) setBeginListener(builder.beginListener);
     if (!httpClient.isStarted()) {
       try {
         httpClient.start();
@@ -143,166 +177,187 @@ public class Http2SolrClient extends SolrClient {
     assert ObjectReleaseTracker.track(this);
   }
 
-  public void addListenerFactory(HttpListenerFactory factory) {
-    this.listenerFactory.add(factory);
+  public void setBeginListener(Request.BeginListener beginListener) {
+    this.beginListener = beginListener;
   }
 
-  HttpClient getHttpClient() {
-    return httpClient;
-  }
-
-  ProtocolHandlers getProtocolHandlers() {
+  public ProtocolHandlers getProtocolHandlers() {
     return httpClient.getProtocolHandlers();
   }
 
   private HttpClient createHttpClient(Builder builder) {
+
     HttpClient httpClient;
 
-    BlockingArrayQueue<Runnable> queue = new BlockingArrayQueue<>(128, 128);
-    QueuedThreadPool httpClientExecutor = new QueuedThreadPool(128, 4, 60000, queue);
+    QueuedThreadPool httpClientExecutor = new QueuedThreadPool(100, 4);
     httpClientExecutor.setDaemon(true);
 
-    SslContextFactory sslContextFactory;
-    if (builder.sslConfig == null) {
-      sslContextFactory = getDefaultSslContextFactory();
-    } else {
-      sslContextFactory = builder.sslConfig.createContextFactory();
-    }
-
     HttpClientTransport transport;
-    if (builder.useHttp1_1) {
-      log.debug("Create Http2SolrClient with HTTP/1.1 transport");
+    if (useHttp1(builder)) {
+      LOG.info("Create Http2SolrClient with HTTP/1.1 transport");
       transport = new HttpClientTransportOverHTTP(2);
+
+      SslContextFactory sslContextFactory;
+      if (builder.sslConfig == null) {
+        sslContextFactory = getDefaultSslContextFactory();
+      } else {
+        sslContextFactory = builder.sslConfig.createContextFactory();
+      }
       httpClient = new HttpClient(transport, sslContextFactory);
     } else {
-      log.debug("Create Http2SolrClient with HTTP/2 transport");
+      LOG.info("Create Http2SolrClient with HTTP/2 transport");
+      //TODO adding https support for HTTP2 when use JDK 9
       HTTP2Client http2client = new HTTP2Client();
       transport = new HttpClientTransportOverHTTP2(http2client);
-      httpClient = new HttpClient(transport, sslContextFactory);
+      httpClient = new HttpClient(transport, null);
     }
-
     httpClient.setExecutor(httpClientExecutor);
     httpClient.setStrictEventOrdering(false);
     httpClient.setConnectBlocking(true);
     httpClient.setFollowRedirects(false);
-    httpClient.setMaxRequestsQueuedPerDestination(asyncTracker.getMaxRequestsQueuedPerDestination());
+    httpClient.setMaxConnectionsPerDestination(4);
+    httpClient.setMaxRequestsQueuedPerDestination(MAX_OUTSTANDING_REQUESTS * 4); // comfortably above max outstanding     // requests
     httpClient.setUserAgentField(new HttpField(HttpHeader.USER_AGENT, AGENT));
 
-//    if (builder.maxConnectionsPerHost != null) httpClient.setMaxConnectionsPerDestination(builder.maxConnectionsPerHost);
-    httpClient.setMaxConnectionsPerDestination(4);
     if (builder.idleTimeout != null) httpClient.setIdleTimeout(builder.idleTimeout);
     if (builder.connectionTimeout != null) httpClient.setConnectTimeout(builder.connectionTimeout);
     return httpClient;
   }
 
+  private boolean useHttp1(Builder builder) {
+    if (serverBaseUrl != null && serverBaseUrl.startsWith("https"))
+      return true;
+
+    if (builder.useHttp1_1 || builder.sslConfig != null)
+      return true;
+
+    if (System.getProperty("javax.net.ssl.trustStore") != null)
+      return true;
+
+    return false;
+  }
+
+  public HttpClient getHttpClient() {
+    return httpClient;
+  }
+
   public void close() {
     // we wait for async requests, so far devs don't want to give sugar for this
-    asyncTracker.waitForComplete();
+    phaser.arriveAndAwaitAdvance();
+    phaser.arriveAndDeregister();
     if (closeClient) {
-      try {
-        // TODO: stop time?
-        httpClient.setStopTimeout(1000);
-        httpClient.stop();
-      } catch (Exception e) {
-        throw new RuntimeException("Exception on closing client", e);
-      }
+      close(httpClient);
     }
 
     assert ObjectReleaseTracker.release(this);
   }
 
+  public static void close(HttpClient httpClient) {
+    try {
+      // TODO: stop time?
+      httpClient.setStopTimeout(1000);
+      httpClient.stop();
+
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public void request(SolrRequest solrRequest, String collection, OnComplete onComplete)
+      throws SolrServerException, IOException {
+    request(solrRequest, collection, onComplete, false);
+  }
+
   private boolean isV2ApiRequest(final SolrRequest request) {
     return request instanceof V2Request || request.getPath().contains("/____v2");
   }
 
-  public NamedList<Object> request(SolrRequest solrRequest,
+  private Http2ClientResponse request(SolrRequest solrRequest,
                                       String collection,
-                                      OnComplete onComplete) throws IOException, SolrServerException {
+                                      OnComplete onComplete,
+                                      boolean returnStream) throws IOException, SolrServerException {
     Request req = makeRequest(solrRequest, collection);
     setBasicAuthHeader(solrRequest, req);
-    for (HttpListenerFactory factory : listenerFactory) {
-      HttpListenerFactory.RequestResponseListener listener = factory.get();
-      req.onRequestQueued(listener);
+    if (listenerFactory != null) {
+      HttpListenerFactory.RequestResponseListener listener = listenerFactory.get();
       req.onRequestBegin(listener);
       req.onComplete(listener);
     }
 
-    if (onComplete != null) {
-      // This async call only suitable for indexing since the response size is limited by 5MB
-      req.onRequestQueued(asyncTracker.queuedListener)
-          .onComplete(asyncTracker.completeListener).send(new BufferingResponseListener(5 * 1024 * 1024) {
-
-        @Override
-        public void onComplete(Result result) {
-          if (result.isFailed()) {
-            onComplete.onFailure(result.getFailure());
-            return;
-          }
+    if (beginListener != null) {
+      // By calling listener here, we will make sure that SolrRequestInfo can be get from the same thread
+      beginListener.onBegin(req);
+    }
+    try {
+      if (onComplete != null) {
+        req.onRequestQueued(requestQueuedListener)
+            .onComplete(requestCompleteListener).send(new BufferingResponseListener() {
+
+          @Override
+          public void onComplete(Result result) {
+            if (result.isFailed()) {
+              onComplete.onFailure(result.getFailure());
+              return;
+            }
 
-          NamedList<Object> rsp;
-          try {
-            InputStream is = getContentAsInputStream();
-            assert ObjectReleaseTracker.track(is);
-            rsp = processErrorsAndResponse(result.getResponse(),
-                parser, is, getEncoding(), isV2ApiRequest(solrRequest));
-            onComplete.onSuccess(rsp);
-          } catch (Exception e) {
-            onComplete.onFailure(e);
+            // TODO: should we stream this?
+            try (InputStream ris = getContentAsInputStream()) {
+              NamedList<Object> rsp;
+              try {
+                rsp = processErrorsAndResponse(result.getResponse(),
+                    parser, ris, getEncoding(), isV2ApiRequest(solrRequest));
+                onComplete.onSuccess(rsp);
+              } catch (Exception e) {
+                onComplete.onFailure(e);
+              }
+            } catch (IOException e1) {
+              onComplete.onFailure(e1);
+            }
           }
+        });
+        return null;
+      } else {
+        Http2ClientResponse arsp = new Http2ClientResponse();
+        if (returnStream) {
+          InputStreamResponseListener listener = new InputStreamResponseListener();
+          req.send(listener);
+          // Wait for the response headers to arrive
+          listener.get(idleTimeout, TimeUnit.SECONDS);
+          // TODO: process response
+          arsp.stream = listener.getInputStream();
+        } else {
+          ContentResponse response = req.send();
+          ByteArrayInputStream is = new ByteArrayInputStream(response.getContent());
+          arsp.response = processErrorsAndResponse(response, parser,
+              is, response.getEncoding(), isV2ApiRequest(solrRequest));
         }
-      });
-      return null;
-    } else {
-      try {
-        InputStreamResponseListener listener = new InputStreamResponseListener();
-        req.send(listener);
-        Response response = listener.get(idleTimeout, TimeUnit.SECONDS);
-        InputStream is = listener.getInputStream();
-        assert ObjectReleaseTracker.track(is);
-        return processErrorsAndResponse(response, parser, is, getEncoding(response), isV2ApiRequest(solrRequest));
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new RuntimeException(e);
-      } catch (TimeoutException e) {
+        return arsp;
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException(e);
+    } catch (TimeoutException e) {
+      throw new SolrServerException(
+          "Timeout occured while waiting response from server at: "
+              + getBaseURL(), e);
+    } catch (ExecutionException e) {
+      Throwable cause = e.getCause();
+      if (cause instanceof ConnectException) {
+        throw new SolrServerException("Server refused connection at: "
+            + getBaseURL(), cause);
+      }
+      if (cause instanceof SolrServerException) {
+        throw (SolrServerException) cause;
+      } else if (cause instanceof IOException) {
         throw new SolrServerException(
-            "Timeout occured while waiting response from server at: "
-                + getBaseURL(), e);
-      } catch (ExecutionException e) {
-        Throwable cause = e.getCause();
-        if (cause instanceof ConnectException) {
-          throw new SolrServerException("Server refused connection at: "
-              + getBaseURL(), cause);
-        }
-        if (cause instanceof SolrServerException) {
-          throw (SolrServerException) cause;
-        } else if (cause instanceof IOException) {
-          throw new SolrServerException(
-              "IOException occured when talking to server at: " + getBaseURL(), cause);
-        }
-        throw new SolrServerException(cause.getMessage(), cause);
+            "IOException occured when talking to server at: " + getBaseURL(), cause);
       }
+      throw new SolrServerException(cause.getMessage(), cause);
     }
   }
 
-  private String getEncoding(Response response) {
-    String contentType = response.getHeaders().get(HttpHeader.CONTENT_TYPE);
-    if (contentType != null) {
-      String charset = "charset=";
-      int index = contentType.toLowerCase(Locale.ENGLISH).indexOf(charset);
-      if (index > 0) {
-        String encoding = contentType.substring(index + charset.length());
-        // Sometimes charsets arrive with an ending semicolon.
-        int semicolon = encoding.indexOf(';');
-        if (semicolon > 0)
-          encoding = encoding.substring(0, semicolon).trim();
-        // Sometimes charsets are quoted.
-        int lastIndex = encoding.length() - 1;
-        if (encoding.charAt(0) == '"' && encoding.charAt(lastIndex) == '"')
-          encoding = encoding.substring(1, lastIndex).trim();
-        return encoding;
-      }
-    }
-    return null;
+  public void setListenerFactory(HttpListenerFactory listenerFactory) {
+    this.listenerFactory = listenerFactory;
   }
 
   private void setBasicAuthHeader(SolrRequest solrRequest, Request req) throws UnsupportedEncodingException {
@@ -454,133 +509,117 @@ public class Http2SolrClient extends SolrClient {
     return req;
   }
 
-  private boolean wantStream(final ResponseParser processor) {
-    return processor == null || processor instanceof InputStreamResponseParser;
-  }
-
   private NamedList<Object> processErrorsAndResponse(Response response,
                                                      final ResponseParser processor,
                                                      InputStream is,
                                                      String encoding,
                                                      final boolean isV2Api)
       throws SolrServerException {
-    boolean shouldClose = true;
-    try {
-      // handle some http level checks before trying to parse the response
-      int httpStatus = response.getStatus();
-
-      String contentType;
-      contentType = response.getHeaders().get("content-type");
-      if (contentType == null) contentType = "";
-
-      switch (httpStatus) {
-        case HttpStatus.SC_OK:
-        case HttpStatus.SC_BAD_REQUEST:
-        case HttpStatus.SC_CONFLICT: // 409
-          break;
-        case HttpStatus.SC_MOVED_PERMANENTLY:
-        case HttpStatus.SC_MOVED_TEMPORARILY:
-          if (!httpClient.isFollowRedirects()) {
-            throw new SolrServerException("Server at " + getBaseURL()
-                + " sent back a redirect (" + httpStatus + ").");
-          }
-          break;
-        default:
-          if (processor == null || "".equals(contentType)) {
-            throw new RemoteSolrException(serverBaseUrl, httpStatus, "non ok status: " + httpStatus
-                + ", message:" + response.getReason(),
-                null);
-          }
-      }
-
-      if (wantStream(parser)) {
-        // no processor specified, return raw stream
-        NamedList<Object> rsp = new NamedList<>();
-        rsp.add("stream", is);
-        // Only case where stream should not be closed
-        shouldClose = false;
-        return rsp;
-      }
+    // handle some http level checks before trying to parse the response
+    int httpStatus = response.getStatus();
+
+    String contentType;
+    contentType = response.getHeaders().get("content-type");
+    if (contentType == null) contentType = "";
+
+    switch (httpStatus) {
+      case HttpStatus.SC_OK:
+      case HttpStatus.SC_BAD_REQUEST:
+      case HttpStatus.SC_CONFLICT: // 409
+        break;
+      case HttpStatus.SC_MOVED_PERMANENTLY:
+      case HttpStatus.SC_MOVED_TEMPORARILY:
+        if (!httpClient.isFollowRedirects()) {
+          throw new SolrServerException("Server at " + getBaseURL()
+              + " sent back a redirect (" + httpStatus + ").");
+        }
+        break;
+      default:
+        if (processor == null || "".equals(contentType)) {
+          throw new RemoteSolrException(serverBaseUrl, httpStatus, "non ok status: " + httpStatus
+              + ", message:" + response.getReason(),
+              null);
+        }
+    }
 
-      String procCt = processor.getContentType();
-      if (procCt != null) {
-        String procMimeType = ContentType.parse(procCt).getMimeType().trim().toLowerCase(Locale.ROOT);
-        String mimeType = ContentType.parse(contentType).getMimeType().trim().toLowerCase(Locale.ROOT);
-        if (!procMimeType.equals(mimeType)) {
-          // unexpected mime type
-          String msg = "Expected mime type " + procMimeType + " but got " + mimeType + ".";
-          try {
-            msg = msg + " " + IOUtils.toString(is, encoding);
-          } catch (IOException e) {
-            throw new RemoteSolrException(serverBaseUrl, httpStatus, "Could not parse response with encoding " + encoding, e);
-          }
-          throw new RemoteSolrException(serverBaseUrl, httpStatus, msg, null);
+    String procCt = processor.getContentType();
+    if (procCt != null) {
+      String procMimeType = ContentType.parse(procCt).getMimeType().trim().toLowerCase(Locale.ROOT);
+      String mimeType = ContentType.parse(contentType).getMimeType().trim().toLowerCase(Locale.ROOT);
+      if (!procMimeType.equals(mimeType)) {
+        // unexpected mime type
+        String msg = "Expected mime type " + procMimeType + " but got " + mimeType + ".";
+        try {
+          msg = msg + " " + IOUtils.toString(is, encoding);
+        } catch (IOException e) {
+          throw new RemoteSolrException(serverBaseUrl, httpStatus, "Could not parse response with encoding " + encoding, e);
         }
+        throw new RemoteSolrException(serverBaseUrl, httpStatus, msg, null);
       }
+    }
 
-      NamedList<Object> rsp;
-      try {
-        rsp = parser.processResponse(is, encoding);
-      } catch (Exception e) {
-        throw new RemoteSolrException(serverBaseUrl, httpStatus, e.getMessage(), e);
-      }
+    NamedList<Object> rsp;
+    try {
+      rsp = parser.processResponse(is, encoding);
+    } catch (Exception e) {
+      throw new RemoteSolrException(serverBaseUrl, httpStatus, e.getMessage(), e);
+    }
 
-      Object error = rsp == null ? null : rsp.get("error");
-      if (error != null && (String.valueOf(getObjectByPath(error, true, errPath)).endsWith("ExceptionWithErrObject"))) {
-        throw RemoteExecutionException.create(serverBaseUrl, rsp);
-      }
-      if (httpStatus != HttpStatus.SC_OK && !isV2Api) {
-        NamedList<String> metadata = null;
-        String reason = null;
-        try {
-          NamedList err = (NamedList) rsp.get("error");
-          if (err != null) {
-            reason = (String) err.get("msg");
-            if (reason == null) {
-              reason = (String) err.get("trace");
-            }
-            metadata = (NamedList<String>) err.get("metadata");
-          }
-        } catch (Exception ex) {}
-        if (reason == null) {
-          StringBuilder msg = new StringBuilder();
-          msg.append(response.getReason())
-              .append("\n\n")
-              .append("request: ")
-              .append(response.getRequest().getMethod());
-          try {
-            reason = java.net.URLDecoder.decode(msg.toString(), UTF_8);
-          } catch (UnsupportedEncodingException e) {
+    Object error = rsp == null ? null : rsp.get("error");
+    if (error != null && (String.valueOf(getObjectByPath(error, true, errPath)).endsWith("ExceptionWithErrObject"))) {
+      throw RemoteExecutionException.create(serverBaseUrl, rsp);
+    }
+    if (httpStatus != HttpStatus.SC_OK && !isV2Api) {
+      NamedList<String> metadata = null;
+      String reason = null;
+      try {
+        NamedList err = (NamedList) rsp.get("error");
+        if (err != null) {
+          reason = (String) err.get("msg");
+          if (reason == null) {
+            reason = (String) err.get("trace");
           }
+          metadata = (NamedList<String>) err.get("metadata");
         }
-        RemoteSolrException rss = new RemoteSolrException(serverBaseUrl, httpStatus, reason, null);
-        if (metadata != null) rss.setMetadata(metadata);
-        throw rss;
+      } catch (Exception ex) {
       }
-      return rsp;
-    } finally {
-      if (shouldClose) {
+      if (reason == null) {
+        StringBuilder msg = new StringBuilder();
+        msg.append(response.getReason())
+            .append("\n\n")
+            .append("request: ")
+            .append(response.getRequest().getMethod());
         try {
-          is.close();
-          assert ObjectReleaseTracker.release(is);
-        } catch (IOException e) {
-          // quitely
+          reason = java.net.URLDecoder.decode(msg.toString(), UTF_8);
+        } catch (UnsupportedEncodingException e) {
         }
       }
+      RemoteSolrException rss = new RemoteSolrException(serverBaseUrl, httpStatus, reason, null);
+      if (metadata != null) rss.setMetadata(metadata);
+      throw rss;
     }
+    return rsp;
   }
 
   @Override
   public NamedList<Object> request(SolrRequest request, String collection) throws SolrServerException, IOException {
-    return request(request, collection, null);
+    return request(request, collection, null, false).response;
   }
 
   public void setRequestWriter(RequestWriter requestWriter) {
     this.requestWriter = requestWriter;
   }
 
+  private InputStream queryAndStreamResponse(String collection, SolrParams params)
+      throws SolrServerException, IOException {
+    QueryRequest queryRequest = new QueryRequest(params);
+    Http2ClientResponse resp = request(queryRequest, collection, null, true);
+    assert resp.stream != null;
+    return resp.stream;
+  }
+
   public interface OnComplete {
-    void onSuccess(NamedList<Object> result);
+    void onSuccess(NamedList result);
 
     void onFailure(Throwable e);
   }
@@ -593,54 +632,15 @@ public class Http2SolrClient extends SolrClient {
     return serverBaseUrl;
   }
 
-  private static class AsyncTracker {
-    private static final int MAX_OUTSTANDING_REQUESTS = 1000;
-
-    // wait for async requests
-    private final Phaser phaser;
-    // maximum outstanding requests left
-    private final Semaphore available;
-    private final Request.QueuedListener queuedListener;
-    private final Response.CompleteListener completeListener;
-
-    AsyncTracker() {
-      // TODO: what about shared instances?
-      phaser = new Phaser(1);
-      available = new Semaphore(MAX_OUTSTANDING_REQUESTS, false);
-      queuedListener = request -> {
-        phaser.register();
-        try {
-          available.acquire();
-        } catch (InterruptedException ignored) {
-
-        }
-      };
-      completeListener = result -> {
-        phaser.arriveAndDeregister();
-        available.release();
-      };
-    }
-
-    int getMaxRequestsQueuedPerDestination() {
-      // comfortably above max outstanding requests
-      return MAX_OUTSTANDING_REQUESTS * 3;
-    }
-
-    public void waitForComplete() {
-      phaser.arriveAndAwaitAdvance();
-      phaser.arriveAndDeregister();
-    }
-  }
-
   public static class Builder {
 
     private HttpClient httpClient;
     private SSLConfig sslConfig = defaultSSLConfig;
     private Integer idleTimeout;
     private Integer connectionTimeout;
-    private Integer maxConnectionsPerHost;
     private boolean useHttp1_1 = Boolean.getBoolean("solr.http1");
     protected String baseSolrUrl;
+    private Request.BeginListener beginListener = request -> {};
 
     public Builder() {
 
@@ -664,11 +664,6 @@ public class Http2SolrClient extends SolrClient {
       return this;
     }
 
-    public Builder maxConnectionsPerHost(int max) {
-      this.maxConnectionsPerHost = max;
-      return this;
-    }
-
     public Builder idleTimeout(int idleConnectionTimeout) {
       this.idleTimeout = idleConnectionTimeout;
       return this;
@@ -684,6 +679,10 @@ public class Http2SolrClient extends SolrClient {
       return this;
     }
 
+    public Builder withListener(Request.BeginListener beginListener) {
+      this.beginListener = beginListener;
+      return this;
+    }
   }
 
   /**
@@ -732,6 +731,11 @@ public class Http2SolrClient extends SolrClient {
     }
   }
 
+  protected static class Http2ClientResponse {
+    NamedList response;
+    InputStream stream;
+  }
+
   public Set<String> getQueryParams() {
     return queryParams;
   }
@@ -816,4 +820,4 @@ public class Http2SolrClient extends SolrClient {
     return sslContextFactory;
   }
 
-}
+}
\ No newline at end of file