You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by da...@apache.org on 2020/08/06 10:04:19 UTC

[lucene-solr] 01/01: SOLR-14713: Single thread on streaming updates

This is an automated email from the ASF dual-hosted git repository.

datcm pushed a commit to branch jira/SOLR-14713
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit 5986b4cc3c83ac89d014d85ec4ea53d303800fe7
Author: Cao Manh Dat <da...@apache.org>
AuthorDate: Thu Aug 6 17:03:57 2020 +0700

    SOLR-14713: Single thread on streaming updates
---
 .../org/apache/solr/update/SolrCmdDistributor.java | 127 +++-----------
 .../apache/solr/update/StreamingSolrClients.java   | 188 ++++++++++++---------
 .../solr/update/MockStreamingSolrClients.java      |  47 +++---
 .../apache/solr/update/SolrCmdDistributorTest.java |  24 +--
 .../solr/client/solrj/impl/Http2SolrClient.java    |  76 +++++++--
 5 files changed, 226 insertions(+), 236 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
index 9d2377e..ef627d5 100644
--- a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
+++ b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
@@ -26,18 +26,12 @@ import java.net.SocketException;
 import java.net.SocketTimeoutException;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CompletionService;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.Future;
 
 import io.opentracing.Span;
 import io.opentracing.Tracer;
 import io.opentracing.propagation.Format;
 import org.apache.http.NoHttpResponseException;
-import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.impl.BinaryResponseParser;
 import org.apache.solr.client.solrj.impl.ConcurrentUpdateSolrClient;
@@ -70,49 +64,37 @@ public class SolrCmdDistributor implements Closeable {
   private int retryPause = 500;
   
   private final List<Error> allErrors = new ArrayList<>();
-  private final List<Error> errors = Collections.synchronizedList(new ArrayList<Error>());
-  
-  private final CompletionService<Object> completionService;
-  private final Set<Future<Object>> pending = new HashSet<>();
-  
-  public static interface AbortCheck {
-    public boolean abortCheck();
-  }
-  
+
   public SolrCmdDistributor(UpdateShardHandler updateShardHandler) {
     this.clients = new StreamingSolrClients(updateShardHandler);
-    this.completionService = new ExecutorCompletionService<>(updateShardHandler.getUpdateExecutor());
   }
   
   /* For tests only */
   SolrCmdDistributor(StreamingSolrClients clients, int retryPause) {
     this.clients = clients;
     this.retryPause = retryPause;
-    completionService = new ExecutorCompletionService<>(clients.getUpdateExecutor());
   }
   
-  public void finish() {    
+  public void finish() {
+    if (finished)
+      return;
     try {
-      assert !finished : "lifecycle sanity check";
       finished = true;
 
       blockAndDoRetries();
     } catch (IOException e) {
       log.warn("Unable to finish sending updates", e);
-    } finally {
-      clients.shutdown();
     }
   }
   
   public void close() {
-    clients.shutdown();
+    clients.finishStreams();
   }
 
   private void doRetriesIfNeeded() throws IOException {
     // NOTE: retries will be forwards to a single url
     
-    List<Error> errors = new ArrayList<>(this.errors);
-    errors.addAll(clients.getErrors());
+    List<Error> errors = new ArrayList<>(clients.getErrors());
     List<Error> resubmitList = new ArrayList<>();
     
     if (log.isInfoEnabled() && errors.size() > 0) {
@@ -150,6 +132,8 @@ public class SolrCmdDistributor implements Closeable {
           err.req.retries++;
           resubmitList.add(err);
         } else {
+          // only track the error if we are not retrying the request
+          err.req.trackRequestResult(null, false);
           allErrors.add(err);
         }
       } catch (Exception e) {
@@ -173,7 +157,6 @@ public class SolrCmdDistributor implements Closeable {
     }
     
     clients.clearErrors();
-    this.errors.clear();
     for (Error err : resubmitList) {
       if (err.req.node instanceof ForwardNode) {
         SolrException.log(SolrCmdDistributor.log, "forwarding update to "
@@ -188,7 +171,7 @@ public class SolrCmdDistributor implements Closeable {
             + err.req.cmd.toString() + " params:"
             + err.req.uReq.getParams() + " rsp:" + err.statusCode, err.e);
       }
-      submit(err.req, false);
+      submit(err.req);
     }
     
     if (resubmitList.size() > 0) {
@@ -217,7 +200,7 @@ public class SolrCmdDistributor implements Closeable {
       } else {
         uReq.deleteByQuery(cmd.query);
       }
-      submit(new Req(cmd, node, uReq, sync, rollupTracker, leaderTracker), false);
+      submit(new Req(cmd, node, uReq, sync, rollupTracker, leaderTracker));
     }
   }
   
@@ -241,7 +224,7 @@ public class SolrCmdDistributor implements Closeable {
       if (cmd.isInPlaceUpdate()) {
         params.set(DistributedUpdateProcessor.DISTRIB_INPLACE_PREVVERSION, String.valueOf(cmd.prevVersion));
       }
-      submit(new Req(cmd, node, uReq, synchronous, rollupTracker, leaderTracker), false);
+      submit(new Req(cmd, node, uReq, synchronous, rollupTracker, leaderTracker));
     }
     
   }
@@ -258,28 +241,14 @@ public class SolrCmdDistributor implements Closeable {
       uReq.setParams(params);
 
       addCommit(uReq, cmd);
-      submit(new Req(cmd, node, uReq, false), true);
+      submit(new Req(cmd, node, uReq, false));
     }
     
   }
 
   public void blockAndDoRetries() throws IOException {
-    clients.blockUntilFinished();
-    
-    // wait for any async commits to complete
-    while (pending != null && pending.size() > 0) {
-      Future<Object> future = null;
-      try {
-        future = completionService.take();
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        log.error("blockAndDoRetries interrupted", e);
-      }
-      if (future == null) break;
-      pending.remove(future);
-    }
+    clients.finishStreams();
     doRetriesIfNeeded();
-
   }
   
   void addCommit(UpdateRequest ureq, CommitUpdateCommand cmd) {
@@ -288,7 +257,7 @@ public class SolrCmdDistributor implements Closeable {
         : AbstractUpdateRequest.ACTION.COMMIT, false, cmd.waitSearcher, cmd.maxOptimizeSegments, cmd.softCommit, cmd.expungeDeletes, cmd.openSearcher);
   }
 
-  private void submit(final Req req, boolean isCommit) throws IOException {
+  private void submit(final Req req) throws IOException {
     // Copy user principal from the original request to the new update request, for later authentication interceptor use
     if (SolrRequestInfo.getRequestInfo() != null) {
       req.uReq.setUserPrincipal(SolrRequestInfo.getRequestInfo().getReq().getUserPrincipal());
@@ -301,60 +270,13 @@ public class SolrCmdDistributor implements Closeable {
           new SolrRequestCarrier(req.uReq));
     }
 
-    if (req.synchronous) {
-      blockAndDoRetries();
-
-      try {
-        req.uReq.setBasePath(req.node.getUrl());
-        clients.getHttpClient().request(req.uReq);
-      } catch (Exception e) {
-        SolrException.log(log, e);
-        Error error = new Error();
-        error.e = e;
-        error.req = req;
-        if (e instanceof SolrException) {
-          error.statusCode = ((SolrException) e).code();
-        }
-        errors.add(error);
-      }
-      
-      return;
-    }
-    
     if (log.isDebugEnabled()) {
       log.debug("sending update to {} retry: {} {} params {}"
-          , req.node.getUrl(), req.retries, req.cmd, req.uReq.getParams());
-    }
-    
-    if (isCommit) {
-      // a commit using ConncurrentUpdateSolrServer is not async,
-      // so we make it async to prevent commits from happening
-      // serially across multiple nodes
-      pending.add(completionService.submit(() -> {
-        doRequest(req);
-        return null;
-      }));
-    } else {
-      doRequest(req);
-    }
-  }
-  
-  private void doRequest(final Req req) {
-    try {
-      SolrClient solrClient = clients.getSolrClient(req);
-      solrClient.request(req.uReq);
-    } catch (Exception e) {
-      SolrException.log(log, e);
-      Error error = new Error();
-      error.e = e;
-      error.req = req;
-      if (e instanceof SolrException) {
-        error.statusCode = ((SolrException) e).code();
-      }
-      errors.add(error);
+              , req.node.getUrl(), req.retries, req.cmd, req.uReq.getParams());
     }
+    clients.getClient(req).request(req);
   }
-  
+
   public static class Req {
     public Node node;
     public UpdateRequest uReq;
@@ -406,11 +328,11 @@ public class SolrCmdDistributor implements Closeable {
     //
     // In the case of a leaderTracker and rollupTracker both being present, then we need to take care when assembling
     // the final response to check both the rollup and leader trackers on the aggregator node.
-    public void trackRequestResult(org.eclipse.jetty.client.api.Response resp, InputStream respBody, boolean success) {
+    public void trackRequestResult(NamedList<Object> rs, boolean success) {
 
       // Returning Integer.MAX_VALUE here means there was no "rf" on the response, therefore we just need to increment
       // our achieved rf if we are a leader, i.e. have a leaderTracker.
-      int rfFromResp = getRfFromResponse(respBody);
+      int rfFromResp = getRfFromResponse(rs);
 
       if (leaderTracker != null && rfFromResp == Integer.MAX_VALUE) {
         leaderTracker.trackRequestResult(node, success);
@@ -421,11 +343,9 @@ public class SolrCmdDistributor implements Closeable {
       }
     }
 
-    private int getRfFromResponse(InputStream inputStream) {
-      if (inputStream != null) {
+    private int getRfFromResponse(NamedList<Object> nl) {
+      if (nl != null) {
         try {
-          BinaryResponseParser brp = new BinaryResponseParser();
-          NamedList<Object> nl = brp.processResponse(inputStream, null);
           Object hdr = nl.get("responseHeader");
           if (hdr != null && hdr instanceof NamedList) {
             @SuppressWarnings({"unchecked"})
@@ -453,6 +373,7 @@ public class SolrCmdDistributor implements Closeable {
   public static class Error {
     public Exception e;
     public int statusCode = -1;
+    public Boolean retriable;
 
     /**
      * NOTE: This is the request that happened to be executed when this error was <b>triggered</b> the error, 
@@ -465,8 +386,8 @@ public class SolrCmdDistributor implements Closeable {
     public String toString() {
       StringBuilder sb = new StringBuilder();
       sb.append("SolrCmdDistributor$Error: statusCode=").append(statusCode);
-      sb.append("; exception=").append(String.valueOf(e));
-      sb.append("; req=").append(String.valueOf(req));
+      sb.append("; exception=").append(e);
+      sb.append("; req=").append(req);
       return sb.toString();
     }
   }
diff --git a/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java b/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java
index c9040c9..a2646cb 100644
--- a/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java
+++ b/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java
@@ -26,10 +26,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 
-import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.impl.ConcurrentUpdateHttp2SolrClient;
 import org.apache.solr.client.solrj.impl.Http2SolrClient;
+import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.common.SolrException;
+import org.apache.solr.common.util.NamedList;
 import org.apache.solr.update.SolrCmdDistributor.Error;
 import org.eclipse.jetty.client.api.Response;
 import org.slf4j.Logger;
@@ -38,14 +39,10 @@ import org.slf4j.LoggerFactory;
 public class StreamingSolrClients {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-  private final int runnerCount = Integer.getInteger("solr.cloud.replication.runners", 1);
-  // should be less than solr.jetty.http.idleTimeout
-  private final int pollQueueTime = Integer.getInteger("solr.cloud.client.pollQueueTime", 10000);
-
   private Http2SolrClient httpClient;
 
-  private Map<String, ConcurrentUpdateHttp2SolrClient> solrClients = new HashMap<>();
-  private List<Error> errors = Collections.synchronizedList(new ArrayList<Error>());
+  private Map<String, SingleStreamClient> clients = new HashMap<>();
+  private List<Error> errors = Collections.synchronizedList(new ArrayList<>());
 
   private ExecutorService updateExecutor;
 
@@ -62,35 +59,124 @@ public class StreamingSolrClients {
     errors.clear();
   }
 
-  public synchronized SolrClient getSolrClient(final SolrCmdDistributor.Req req) {
+  public synchronized SingleStreamClient getClient(final SolrCmdDistributor.Req req) {
     String url = getFullUrl(req.node.getUrl());
-    ConcurrentUpdateHttp2SolrClient client = solrClients.get(url);
+    SingleStreamClient client = clients.get(url);
     if (client == null) {
-      // NOTE: increasing to more than 1 threadCount for the client could cause updates to be reordered
-      // on a greater scale since the current behavior is to only increase the number of connections/Runners when
-      // the queue is more than half full.
-      client = new ErrorReportingConcurrentUpdateSolrClient.Builder(url, httpClient, req, errors)
-          .withQueueSize(100)
-          .withThreadCount(runnerCount)
-          .withExecutorService(updateExecutor)
-          .alwaysStreamDeletes()
-          .build();
-      client.setPollQueueTime(pollQueueTime); // minimize connections created
-      solrClients.put(url, client);
+      client = new SingleStreamClient(httpClient, url);
+      clients.put(url, client);
     }
 
     return client;
   }
 
-  public synchronized void blockUntilFinished() throws IOException {
-    for (ConcurrentUpdateHttp2SolrClient client : solrClients.values()) {
-      client.blockUntilFinished();
+  class SingleStreamClient {
+
+    private final Http2SolrClient client;
+    private final String basePath;
+    private Http2SolrClient.OutStream outStream;
+    private SolrCmdDistributor.Req lastReq;
+    private int numRequests = 0;
+
+    public SingleStreamClient(Http2SolrClient client, String basePath) {
+      this.client = client;
+      this.basePath = basePath;
+    }
+
+    public void request(SolrCmdDistributor.Req req) {
+      this.lastReq = req;
+      if (req.synchronous) {
+        closeStream();
+        sendSyncReq(req);
+        return;
+      }
+
+      UpdateRequest request = req.uReq;
+      numRequests++;
+      //TODO datcm serialize the request up-front and reuse it
+      try {
+        if (outStream == null) {
+          outStream = client.initOutStream(basePath, request, request.getCollection());
+        }
+        client.send(outStream, request, request.getCollection());
+      } catch (IOException e) {
+        if (outStream != null) {
+          closeStream();
+        } else {
+          handleError(e, req);
+        }
+      }
+    }
+
+    protected void handleError(Exception ex, SolrCmdDistributor.Req req) {
+      log.error("Error when calling {} to {}", req, req.node.getUrl(), ex);
+      Error error = new Error();
+      error.e = ex;
+      if (ex instanceof SolrException) {
+        error.statusCode = ((SolrException) ex).code();
+      }
+      error.req = req;
+      errors.add(error);
+    }
+
+    public void preFinish() {
+      if (outStream != null) {
+        try {
+          client.closeStream(outStream);
+        } catch (IOException e) {
+          if (numRequests > 1) {
+            handleError(new RuntimeException(), lastReq);
+          } else {
+            handleError(e, lastReq);
+          }
+        }
+      }
+    }
+
+    public void finish() {
+      closeStream();
+    }
+
+    private void sendSyncReq(SolrCmdDistributor.Req req) {
+      try {
+        NamedList<Object> rs = client.request(req.uReq);
+        trackResult(rs);
+      } catch (Exception e) {
+        handleError(e, req);
+      }
+    }
+
+    private void trackResult(NamedList<Object> rs) {
+      lastReq.trackRequestResult(rs, true);
+    }
+
+    /**
+     * Close previous stream and call error handling code
+     */
+    private void closeStream() {
+      if (outStream != null) {
+        try {
+          NamedList<Object> rs = client.closeAndGetResponse(outStream);
+          trackResult(rs);
+        } catch (Exception e) {
+          if (numRequests > 1) {
+            handleError(new RuntimeException(), lastReq);
+          } else {
+            handleError(e, lastReq);
+          }
+        }
+      }
+      outStream = null;
     }
   }
 
-  public synchronized void shutdown() {
-    for (ConcurrentUpdateHttp2SolrClient client : solrClients.values()) {
-      client.close();
+  public synchronized void finishStreams() {
+    //TODO nococmmit SOLR-13975
+    for (SingleStreamClient client : clients.values()) {
+      client.preFinish();
+    }
+    for (SingleStreamClient client : clients.values()) {
+      client.closeStream();
     }
   }
 
@@ -108,54 +194,4 @@ public class StreamingSolrClients {
     return httpClient;
   }
 
-  public ExecutorService getUpdateExecutor() {
-    return updateExecutor;
-  }
-}
-
-class ErrorReportingConcurrentUpdateSolrClient extends ConcurrentUpdateHttp2SolrClient {
-  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-  private final SolrCmdDistributor.Req req;
-  private final List<Error> errors;
-
-  public ErrorReportingConcurrentUpdateSolrClient(Builder builder) {
-    super(builder);
-    this.req = builder.req;
-    this.errors = builder.errors;
-  }
-
-  @Override
-  public void handleError(Throwable ex) {
-    log.error("Error when calling {} to {}", req, req.node.getUrl(), ex);
-    Error error = new Error();
-    error.e = (Exception) ex;
-    if (ex instanceof SolrException) {
-      error.statusCode = ((SolrException) ex).code();
-    }
-    error.req = req;
-    errors.add(error);
-    if (!req.shouldRetry(error)) {
-      // only track the error if we are not retrying the request
-      req.trackRequestResult(null, null, false);
-    }
-  }
-  @Override
-  public void onSuccess(Response resp, InputStream respBody) {
-    req.trackRequestResult(resp, respBody, true);
-  }
-
-  static class Builder extends ConcurrentUpdateHttp2SolrClient.Builder {
-    protected SolrCmdDistributor.Req req;
-    protected List<Error> errors;
-
-    public Builder(String baseSolrUrl, Http2SolrClient client, SolrCmdDistributor.Req req, List<Error> errors) {
-      super(baseSolrUrl, client);
-      this.req = req;
-      this.errors = errors;
-    }
-
-    public ErrorReportingConcurrentUpdateSolrClient build() {
-      return new ErrorReportingConcurrentUpdateSolrClient(this);
-    }
-  }
 }
diff --git a/solr/core/src/test/org/apache/solr/update/MockStreamingSolrClients.java b/solr/core/src/test/org/apache/solr/update/MockStreamingSolrClients.java
index bfa6221..a19ca75 100644
--- a/solr/core/src/test/org/apache/solr/update/MockStreamingSolrClients.java
+++ b/solr/core/src/test/org/apache/solr/update/MockStreamingSolrClients.java
@@ -23,6 +23,7 @@ import org.apache.lucene.util.LuceneTestCase;
 import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.Http2SolrClient;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.util.NamedList;
 
@@ -35,13 +36,13 @@ public class MockStreamingSolrClients extends StreamingSolrClients {
   public MockStreamingSolrClients(UpdateShardHandler updateShardHandler) {
     super(updateShardHandler);
   }
-  
+
   @Override
-  public synchronized SolrClient getSolrClient(final SolrCmdDistributor.Req req) {
-    SolrClient client = super.getSolrClient(req);
-    return new MockSolrClient(client);
+  public synchronized SingleStreamClient getClient(SolrCmdDistributor.Req req) {
+    SingleStreamClient client = super.getClient(req);
+    return new MockClient(client);
   }
-  
+
   public void setExp(Exp exp) {
     this.exp = exp;
   }
@@ -60,37 +61,41 @@ public class MockStreamingSolrClients extends StreamingSolrClients {
     return null;
   }
 
-  class MockSolrClient extends SolrClient {
+  class MockClient extends SingleStreamClient {
+    SingleStreamClient client;
 
-    private SolrClient solrClient;
-
-    public MockSolrClient(SolrClient solrClient) {
-      this.solrClient = solrClient;
+    public MockClient(SingleStreamClient client) {
+      super(null, null);
+      this.client = client;
     }
-    
+
     @Override
-    public NamedList<Object> request(@SuppressWarnings({"rawtypes"})SolrRequest request, String collection)
-        throws SolrServerException, IOException {
+    public void request(SolrCmdDistributor.Req req) {
       if (exp != null) {
         Exception e = exception();
         if (e instanceof IOException) {
           if (LuceneTestCase.random().nextBoolean()) {
-            throw (IOException)e;
+            this.client.handleError(e, req);
+            return;
           } else {
-            throw new SolrServerException(e);
+            this.client.handleError(e, req);
+            return;
           }
         } else if (e instanceof SolrServerException) {
-          throw (SolrServerException)e;
+          this.client.handleError(e, req);
+          return;
         } else {
-          throw new SolrServerException(e);
+          this.client.handleError(e, req);
+          return;
         }
       }
-      
-      return solrClient.request(request);
+      this.client.request(req);
     }
 
     @Override
-    public void close() {}
-    
+    public void finish() {
+      this.client.finish();
+    }
   }
+
 }
diff --git a/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java b/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java
index 77f5a89..c4bc9e4 100644
--- a/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java
+++ b/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java
@@ -19,6 +19,7 @@ package org.apache.solr.update;
 import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.IOException;
+import java.lang.invoke.MethodHandles;
 import java.net.SocketException;
 import java.nio.file.Path;
 import java.util.ArrayList;
@@ -61,11 +62,14 @@ import org.apache.solr.util.TestInjection;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.xml.sax.SAXException;
 
 // See: https://issues.apache.org/jira/browse/SOLR-12028 Tests cannot remove files on Windows machines occasionally
 public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
-  
+
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   private static enum NodeType {FORWARD, STANDARD};
   
   private AtomicInteger id = new AtomicInteger();
@@ -359,7 +363,6 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
     testDeletes(false, false);
     testDeletes(true, true);
     testDeletes(true, false);
-    getRfFromResponseShouldNotCloseTheInputStream();
     testStuckUpdates();
   }
   
@@ -540,22 +543,6 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
     assertFalse(req.shouldRetry(err));
   }
 
-  public void getRfFromResponseShouldNotCloseTheInputStream() {
-    UpdateRequest dbqReq = new UpdateRequest();
-    dbqReq.deleteByQuery("*:*");
-    SolrCmdDistributor.Req req = new SolrCmdDistributor.Req(null, new StdNode(null, "collection1", "shard1", 1), dbqReq, true);
-    AtomicBoolean isClosed = new AtomicBoolean(false);
-    ByteArrayInputStream is = new ByteArrayInputStream(new byte[100]) {
-      @Override
-      public void close() throws IOException {
-        isClosed.set(true);
-        super.close();
-      }
-    };
-    req.trackRequestResult(null, is, true);
-    assertFalse("Underlying stream should not be closed!", isClosed.get());
-  }
-  
   private void testReqShouldRetryMaxRetries() {
     Error err = getError(new SocketException()); 
     SolrCmdDistributor.Req req = new SolrCmdDistributor.Req(null, new StdNode(null, "collection1", "shard1", 1), new UpdateRequest(), true);
@@ -787,6 +774,7 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
   }
 
   private void testRetryNodeAgainstBadAddress() throws SolrServerException, IOException {
+    log.info("Datcm start test");
     // Test RetryNode
     try (SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(updateShardHandler)) {
       final HttpSolrClient solrclient = (HttpSolrClient) clients.get(0);
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
index 218a096..82788b0 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
@@ -266,14 +266,22 @@ public class Http2SolrClient extends SolrClient {
     private final OutputStreamContentProvider outProvider;
     private final InputStreamResponseListener responseListener;
     private final boolean isXml;
+    private final boolean isV2Api;
+    private final String destUri;
+    private boolean closed;
+    private long closedTime;
 
     public OutStream(String origCollection, ModifiableSolrParams origParams,
-                     OutputStreamContentProvider outProvider, InputStreamResponseListener responseListener, boolean isXml) {
+                     OutputStreamContentProvider outProvider, InputStreamResponseListener responseListener,
+                     String destUri,
+                     boolean isXml, boolean isV2Api) {
       this.origCollection = origCollection;
       this.origParams = origParams;
       this.outProvider = outProvider;
       this.responseListener = responseListener;
       this.isXml = isXml;
+      this.isV2Api = isV2Api;
+      this.destUri = destUri;
     }
 
     boolean belongToThisStream(@SuppressWarnings({"rawtypes"})SolrRequest solrRequest, String collection) {
@@ -294,10 +302,23 @@ public class Http2SolrClient extends SolrClient {
 
     @Override
     public void close() throws IOException {
+      if (closed) {
+        return;
+      }
       if (isXml) {
         write("</stream>".getBytes(FALLBACK_CHARSET));
       }
       this.outProvider.getOutputStream().close();
+      closed = true;
+      closedTime = System.nanoTime();
+    }
+
+    private long getIdleTimeout(long idleTimeout) {
+      long timePassed = (System.nanoTime() - closedTime) / 1000;
+      if (timePassed >= idleTimeout) {
+        return 0;
+      }
+      return idleTimeout - timePassed;
     }
 
     //TODO this class should be hidden
@@ -337,7 +358,8 @@ public class Http2SolrClient extends SolrClient {
 
     boolean isXml = ClientUtils.TEXT_XML.equals(requestWriter.getUpdateContentType());
     OutStream outStream = new OutStream(collection, origParams, provider, responseListener,
-        isXml);
+            postRequest.getURI().toString(),
+            isXml, isV2ApiRequest(updateRequest));
     if (isXml) {
       outStream.write("<stream>".getBytes(FALLBACK_CHARSET));
     }
@@ -380,6 +402,7 @@ public class Http2SolrClient extends SolrClient {
       asyncListener.onFailure(e);
       return FAILED_MAKING_REQUEST_CANCELLABLE;
     }
+    final boolean isV2Api = isV2ApiRequest(solrRequest);
     final ResponseParser parser = solrRequest.getResponseParser() == null
         ? this.parser: solrRequest.getResponseParser();
     req.onRequestQueued(asyncTracker.queuedListener)
@@ -393,7 +416,7 @@ public class Http2SolrClient extends SolrClient {
               InputStream is = listener.getInputStream();
               assert ObjectReleaseTracker.track(is);
               try {
-                NamedList<Object> body = processErrorsAndResponse(solrRequest, parser, response, is);
+                NamedList<Object> body = processErrorsAndResponse(parser, response, is, isV2Api);
                 asyncListener.onSuccess(body);
               } catch (RemoteSolrException e) {
                 if (SolrException.getRootCause(e) != CANCELLED_EXCEPTION) {
@@ -416,43 +439,60 @@ public class Http2SolrClient extends SolrClient {
     return () -> req.abort(CANCELLED_EXCEPTION);
   }
 
-  @Override
-  public NamedList<Object> request(@SuppressWarnings({"rawtypes"}) SolrRequest solrRequest, String collection) throws SolrServerException, IOException {
-    Request req = makeRequest(solrRequest, collection);
-    final ResponseParser parser = solrRequest.getResponseParser() == null
-        ? this.parser: solrRequest.getResponseParser();
+  public void closeStream(OutStream outStream) throws IOException {
+    outStream.close();
+  }
+
+  public NamedList<Object> closeAndGetResponse(OutStream outStream) throws IOException, SolrServerException {
+    outStream.close();
+    return getResponseFrom(outStream.responseListener, outStream.destUri, outStream.isV2Api, null, outStream.getIdleTimeout(idleTimeout));
+  }
 
+  private NamedList<Object> getResponseFrom(InputStreamResponseListener listener,
+                                            String destUri, boolean isV2Api,
+                                            ResponseParser parser, long idleTimeout) throws SolrServerException, IOException {
     try {
-      InputStreamResponseListener listener = new InputStreamResponseListener();
-      req.send(listener);
       Response response = listener.get(idleTimeout, TimeUnit.MILLISECONDS);
       InputStream is = listener.getInputStream();
       assert ObjectReleaseTracker.track(is);
-
-      return processErrorsAndResponse(solrRequest, parser, response, is);
+      if (parser == null) {
+        parser = this.parser;
+      }
+      return processErrorsAndResponse(parser, response, is, isV2Api);
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
       throw new RuntimeException(e);
     } catch (TimeoutException e) {
       throw new SolrServerException(
-          "Timeout occured while waiting response from server at: " + req.getURI(), e);
+              "Timeout occured while waiting response from server at: " + destUri, e);
     } catch (ExecutionException e) {
       Throwable cause = e.getCause();
       if (cause instanceof ConnectException) {
-        throw new SolrServerException("Server refused connection at: " + req.getURI(), cause);
+        throw new SolrServerException("Server refused connection at: " + destUri, cause);
       }
       if (cause instanceof SolrServerException) {
         throw (SolrServerException) cause;
       } else if (cause instanceof IOException) {
         throw new SolrServerException(
-            "IOException occured when talking to server at: " + getBaseURL(), cause);
+                "IOException occured when talking to server at: " + destUri, cause);
       }
       throw new SolrServerException(cause.getMessage(), cause);
     }
   }
 
-  private NamedList<Object> processErrorsAndResponse(@SuppressWarnings({"rawtypes"})SolrRequest solrRequest,
-                                                     ResponseParser parser, Response response, InputStream is) throws SolrServerException {
+  @Override
+  public NamedList<Object> request(@SuppressWarnings({"rawtypes"}) SolrRequest solrRequest, String collection) throws SolrServerException, IOException {
+    Request req = makeRequest(solrRequest, collection);
+    final ResponseParser parser = solrRequest.getResponseParser() == null
+        ? this.parser: solrRequest.getResponseParser();
+
+    InputStreamResponseListener listener = new InputStreamResponseListener();
+    req.send(listener);
+    return getResponseFrom(listener, req.getURI().toString(), isV2ApiRequest(solrRequest), parser, idleTimeout);
+  }
+
+  private NamedList<Object> processErrorsAndResponse(ResponseParser parser, Response response,
+                                                     InputStream is, boolean isV2Api) throws SolrServerException {
     ContentType contentType = getContentType(response);
     String mimeType = null;
     String encoding = null;
@@ -460,7 +500,7 @@ public class Http2SolrClient extends SolrClient {
       mimeType = contentType.getMimeType();
       encoding = contentType.getCharset() != null? contentType.getCharset().name() : null;
     }
-    return processErrorsAndResponse(response, parser, is, mimeType, encoding, isV2ApiRequest(solrRequest));
+    return processErrorsAndResponse(response, parser, is, mimeType, encoding, isV2Api);
   }
 
   private ContentType getContentType(Response response) {