You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by da...@apache.org on 2020/08/06 10:04:18 UTC

[lucene-solr] branch jira/SOLR-14713 created (now 5986b4c)

This is an automated email from the ASF dual-hosted git repository.

datcm pushed a change to branch jira/SOLR-14713
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git.


      at 5986b4c  SOLR-14713: Single thread on streaming updates

This branch includes the following new commits:

     new 5986b4c  SOLR-14713: Single thread on streaming updates

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[lucene-solr] 01/01: SOLR-14713: Single thread on streaming updates

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

datcm pushed a commit to branch jira/SOLR-14713
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit 5986b4cc3c83ac89d014d85ec4ea53d303800fe7
Author: Cao Manh Dat <da...@apache.org>
AuthorDate: Thu Aug 6 17:03:57 2020 +0700

    SOLR-14713: Single thread on streaming updates
---
 .../org/apache/solr/update/SolrCmdDistributor.java | 127 +++-----------
 .../apache/solr/update/StreamingSolrClients.java   | 188 ++++++++++++---------
 .../solr/update/MockStreamingSolrClients.java      |  47 +++---
 .../apache/solr/update/SolrCmdDistributorTest.java |  24 +--
 .../solr/client/solrj/impl/Http2SolrClient.java    |  76 +++++++--
 5 files changed, 226 insertions(+), 236 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
index 9d2377e..ef627d5 100644
--- a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
+++ b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
@@ -26,18 +26,12 @@ import java.net.SocketException;
 import java.net.SocketTimeoutException;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CompletionService;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.Future;
 
 import io.opentracing.Span;
 import io.opentracing.Tracer;
 import io.opentracing.propagation.Format;
 import org.apache.http.NoHttpResponseException;
-import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.impl.BinaryResponseParser;
 import org.apache.solr.client.solrj.impl.ConcurrentUpdateSolrClient;
@@ -70,49 +64,37 @@ public class SolrCmdDistributor implements Closeable {
   private int retryPause = 500;
   
   private final List<Error> allErrors = new ArrayList<>();
-  private final List<Error> errors = Collections.synchronizedList(new ArrayList<Error>());
-  
-  private final CompletionService<Object> completionService;
-  private final Set<Future<Object>> pending = new HashSet<>();
-  
-  public static interface AbortCheck {
-    public boolean abortCheck();
-  }
-  
+
   public SolrCmdDistributor(UpdateShardHandler updateShardHandler) {
     this.clients = new StreamingSolrClients(updateShardHandler);
-    this.completionService = new ExecutorCompletionService<>(updateShardHandler.getUpdateExecutor());
   }
   
   /* For tests only */
   SolrCmdDistributor(StreamingSolrClients clients, int retryPause) {
     this.clients = clients;
     this.retryPause = retryPause;
-    completionService = new ExecutorCompletionService<>(clients.getUpdateExecutor());
   }
   
-  public void finish() {    
+  public void finish() {
+    if (finished)
+      return;
     try {
-      assert !finished : "lifecycle sanity check";
       finished = true;
 
       blockAndDoRetries();
     } catch (IOException e) {
       log.warn("Unable to finish sending updates", e);
-    } finally {
-      clients.shutdown();
     }
   }
   
   public void close() {
-    clients.shutdown();
+    clients.finishStreams();
   }
 
   private void doRetriesIfNeeded() throws IOException {
     // NOTE: retries will be forwards to a single url
     
-    List<Error> errors = new ArrayList<>(this.errors);
-    errors.addAll(clients.getErrors());
+    List<Error> errors = new ArrayList<>(clients.getErrors());
     List<Error> resubmitList = new ArrayList<>();
     
     if (log.isInfoEnabled() && errors.size() > 0) {
@@ -150,6 +132,8 @@ public class SolrCmdDistributor implements Closeable {
           err.req.retries++;
           resubmitList.add(err);
         } else {
+          // only track the error if we are not retrying the request
+          err.req.trackRequestResult(null, false);
           allErrors.add(err);
         }
       } catch (Exception e) {
@@ -173,7 +157,6 @@ public class SolrCmdDistributor implements Closeable {
     }
     
     clients.clearErrors();
-    this.errors.clear();
     for (Error err : resubmitList) {
       if (err.req.node instanceof ForwardNode) {
         SolrException.log(SolrCmdDistributor.log, "forwarding update to "
@@ -188,7 +171,7 @@ public class SolrCmdDistributor implements Closeable {
             + err.req.cmd.toString() + " params:"
             + err.req.uReq.getParams() + " rsp:" + err.statusCode, err.e);
       }
-      submit(err.req, false);
+      submit(err.req);
     }
     
     if (resubmitList.size() > 0) {
@@ -217,7 +200,7 @@ public class SolrCmdDistributor implements Closeable {
       } else {
         uReq.deleteByQuery(cmd.query);
       }
-      submit(new Req(cmd, node, uReq, sync, rollupTracker, leaderTracker), false);
+      submit(new Req(cmd, node, uReq, sync, rollupTracker, leaderTracker));
     }
   }
   
@@ -241,7 +224,7 @@ public class SolrCmdDistributor implements Closeable {
       if (cmd.isInPlaceUpdate()) {
         params.set(DistributedUpdateProcessor.DISTRIB_INPLACE_PREVVERSION, String.valueOf(cmd.prevVersion));
       }
-      submit(new Req(cmd, node, uReq, synchronous, rollupTracker, leaderTracker), false);
+      submit(new Req(cmd, node, uReq, synchronous, rollupTracker, leaderTracker));
     }
     
   }
@@ -258,28 +241,14 @@ public class SolrCmdDistributor implements Closeable {
       uReq.setParams(params);
 
       addCommit(uReq, cmd);
-      submit(new Req(cmd, node, uReq, false), true);
+      submit(new Req(cmd, node, uReq, false));
     }
     
   }
 
   public void blockAndDoRetries() throws IOException {
-    clients.blockUntilFinished();
-    
-    // wait for any async commits to complete
-    while (pending != null && pending.size() > 0) {
-      Future<Object> future = null;
-      try {
-        future = completionService.take();
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        log.error("blockAndDoRetries interrupted", e);
-      }
-      if (future == null) break;
-      pending.remove(future);
-    }
+    clients.finishStreams();
     doRetriesIfNeeded();
-
   }
   
   void addCommit(UpdateRequest ureq, CommitUpdateCommand cmd) {
@@ -288,7 +257,7 @@ public class SolrCmdDistributor implements Closeable {
         : AbstractUpdateRequest.ACTION.COMMIT, false, cmd.waitSearcher, cmd.maxOptimizeSegments, cmd.softCommit, cmd.expungeDeletes, cmd.openSearcher);
   }
 
-  private void submit(final Req req, boolean isCommit) throws IOException {
+  private void submit(final Req req) throws IOException {
     // Copy user principal from the original request to the new update request, for later authentication interceptor use
     if (SolrRequestInfo.getRequestInfo() != null) {
       req.uReq.setUserPrincipal(SolrRequestInfo.getRequestInfo().getReq().getUserPrincipal());
@@ -301,60 +270,13 @@ public class SolrCmdDistributor implements Closeable {
           new SolrRequestCarrier(req.uReq));
     }
 
-    if (req.synchronous) {
-      blockAndDoRetries();
-
-      try {
-        req.uReq.setBasePath(req.node.getUrl());
-        clients.getHttpClient().request(req.uReq);
-      } catch (Exception e) {
-        SolrException.log(log, e);
-        Error error = new Error();
-        error.e = e;
-        error.req = req;
-        if (e instanceof SolrException) {
-          error.statusCode = ((SolrException) e).code();
-        }
-        errors.add(error);
-      }
-      
-      return;
-    }
-    
     if (log.isDebugEnabled()) {
       log.debug("sending update to {} retry: {} {} params {}"
-          , req.node.getUrl(), req.retries, req.cmd, req.uReq.getParams());
-    }
-    
-    if (isCommit) {
-      // a commit using ConncurrentUpdateSolrServer is not async,
-      // so we make it async to prevent commits from happening
-      // serially across multiple nodes
-      pending.add(completionService.submit(() -> {
-        doRequest(req);
-        return null;
-      }));
-    } else {
-      doRequest(req);
-    }
-  }
-  
-  private void doRequest(final Req req) {
-    try {
-      SolrClient solrClient = clients.getSolrClient(req);
-      solrClient.request(req.uReq);
-    } catch (Exception e) {
-      SolrException.log(log, e);
-      Error error = new Error();
-      error.e = e;
-      error.req = req;
-      if (e instanceof SolrException) {
-        error.statusCode = ((SolrException) e).code();
-      }
-      errors.add(error);
+              , req.node.getUrl(), req.retries, req.cmd, req.uReq.getParams());
     }
+    clients.getClient(req).request(req);
   }
-  
+
   public static class Req {
     public Node node;
     public UpdateRequest uReq;
@@ -406,11 +328,11 @@ public class SolrCmdDistributor implements Closeable {
     //
     // In the case of a leaderTracker and rollupTracker both being present, then we need to take care when assembling
     // the final response to check both the rollup and leader trackers on the aggregator node.
-    public void trackRequestResult(org.eclipse.jetty.client.api.Response resp, InputStream respBody, boolean success) {
+    public void trackRequestResult(NamedList<Object> rs, boolean success) {
 
       // Returning Integer.MAX_VALUE here means there was no "rf" on the response, therefore we just need to increment
       // our achieved rf if we are a leader, i.e. have a leaderTracker.
-      int rfFromResp = getRfFromResponse(respBody);
+      int rfFromResp = getRfFromResponse(rs);
 
       if (leaderTracker != null && rfFromResp == Integer.MAX_VALUE) {
         leaderTracker.trackRequestResult(node, success);
@@ -421,11 +343,9 @@ public class SolrCmdDistributor implements Closeable {
       }
     }
 
-    private int getRfFromResponse(InputStream inputStream) {
-      if (inputStream != null) {
+    private int getRfFromResponse(NamedList<Object> nl) {
+      if (nl != null) {
         try {
-          BinaryResponseParser brp = new BinaryResponseParser();
-          NamedList<Object> nl = brp.processResponse(inputStream, null);
           Object hdr = nl.get("responseHeader");
           if (hdr != null && hdr instanceof NamedList) {
             @SuppressWarnings({"unchecked"})
@@ -453,6 +373,7 @@ public class SolrCmdDistributor implements Closeable {
   public static class Error {
     public Exception e;
     public int statusCode = -1;
+    public Boolean retriable;
 
     /**
      * NOTE: This is the request that happened to be executed when this error was <b>triggered</b> the error, 
@@ -465,8 +386,8 @@ public class SolrCmdDistributor implements Closeable {
     public String toString() {
       StringBuilder sb = new StringBuilder();
       sb.append("SolrCmdDistributor$Error: statusCode=").append(statusCode);
-      sb.append("; exception=").append(String.valueOf(e));
-      sb.append("; req=").append(String.valueOf(req));
+      sb.append("; exception=").append(e);
+      sb.append("; req=").append(req);
       return sb.toString();
     }
   }
diff --git a/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java b/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java
index c9040c9..a2646cb 100644
--- a/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java
+++ b/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java
@@ -26,10 +26,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 
-import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.impl.ConcurrentUpdateHttp2SolrClient;
 import org.apache.solr.client.solrj.impl.Http2SolrClient;
+import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.common.SolrException;
+import org.apache.solr.common.util.NamedList;
 import org.apache.solr.update.SolrCmdDistributor.Error;
 import org.eclipse.jetty.client.api.Response;
 import org.slf4j.Logger;
@@ -38,14 +39,10 @@ import org.slf4j.LoggerFactory;
 public class StreamingSolrClients {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-  private final int runnerCount = Integer.getInteger("solr.cloud.replication.runners", 1);
-  // should be less than solr.jetty.http.idleTimeout
-  private final int pollQueueTime = Integer.getInteger("solr.cloud.client.pollQueueTime", 10000);
-
   private Http2SolrClient httpClient;
 
-  private Map<String, ConcurrentUpdateHttp2SolrClient> solrClients = new HashMap<>();
-  private List<Error> errors = Collections.synchronizedList(new ArrayList<Error>());
+  private Map<String, SingleStreamClient> clients = new HashMap<>();
+  private List<Error> errors = Collections.synchronizedList(new ArrayList<>());
 
   private ExecutorService updateExecutor;
 
@@ -62,35 +59,124 @@ public class StreamingSolrClients {
     errors.clear();
   }
 
-  public synchronized SolrClient getSolrClient(final SolrCmdDistributor.Req req) {
+  public synchronized SingleStreamClient getClient(final SolrCmdDistributor.Req req) {
     String url = getFullUrl(req.node.getUrl());
-    ConcurrentUpdateHttp2SolrClient client = solrClients.get(url);
+    SingleStreamClient client = clients.get(url);
     if (client == null) {
-      // NOTE: increasing to more than 1 threadCount for the client could cause updates to be reordered
-      // on a greater scale since the current behavior is to only increase the number of connections/Runners when
-      // the queue is more than half full.
-      client = new ErrorReportingConcurrentUpdateSolrClient.Builder(url, httpClient, req, errors)
-          .withQueueSize(100)
-          .withThreadCount(runnerCount)
-          .withExecutorService(updateExecutor)
-          .alwaysStreamDeletes()
-          .build();
-      client.setPollQueueTime(pollQueueTime); // minimize connections created
-      solrClients.put(url, client);
+      client = new SingleStreamClient(httpClient, url);
+      clients.put(url, client);
     }
 
     return client;
   }
 
-  public synchronized void blockUntilFinished() throws IOException {
-    for (ConcurrentUpdateHttp2SolrClient client : solrClients.values()) {
-      client.blockUntilFinished();
+  class SingleStreamClient {
+
+    private final Http2SolrClient client;
+    private final String basePath;
+    private Http2SolrClient.OutStream outStream;
+    private SolrCmdDistributor.Req lastReq;
+    private int numRequests = 0;
+
+    public SingleStreamClient(Http2SolrClient client, String basePath) {
+      this.client = client;
+      this.basePath = basePath;
+    }
+
+    public void request(SolrCmdDistributor.Req req) {
+      this.lastReq = req;
+      if (req.synchronous) {
+        closeStream();
+        sendSyncReq(req);
+        return;
+      }
+
+      UpdateRequest request = req.uReq;
+      numRequests++;
+      //TODO datcm serialize the request up-front and reuse it
+      try {
+        if (outStream == null) {
+          outStream = client.initOutStream(basePath, request, request.getCollection());
+        }
+        client.send(outStream, request, request.getCollection());
+      } catch (IOException e) {
+        if (outStream != null) {
+          closeStream();
+        } else {
+          handleError(e, req);
+        }
+      }
+    }
+
+    protected void handleError(Exception ex, SolrCmdDistributor.Req req) {
+      log.error("Error when calling {} to {}", req, req.node.getUrl(), ex);
+      Error error = new Error();
+      error.e = ex;
+      if (ex instanceof SolrException) {
+        error.statusCode = ((SolrException) ex).code();
+      }
+      error.req = req;
+      errors.add(error);
+    }
+
+    public void preFinish() {
+      if (outStream != null) {
+        try {
+          client.closeStream(outStream);
+        } catch (IOException e) {
+          if (numRequests > 1) {
+            handleError(new RuntimeException(), lastReq);
+          } else {
+            handleError(e, lastReq);
+          }
+        }
+      }
+    }
+
+    public void finish() {
+      closeStream();
+    }
+
+    private void sendSyncReq(SolrCmdDistributor.Req req) {
+      try {
+        NamedList<Object> rs = client.request(req.uReq);
+        trackResult(rs);
+      } catch (Exception e) {
+        handleError(e, req);
+      }
+    }
+
+    private void trackResult(NamedList<Object> rs) {
+      lastReq.trackRequestResult(rs, true);
+    }
+
+    /**
+     * Close previous stream and call error handling code
+     */
+    private void closeStream() {
+      if (outStream != null) {
+        try {
+          NamedList<Object> rs = client.closeAndGetResponse(outStream);
+          trackResult(rs);
+        } catch (Exception e) {
+          if (numRequests > 1) {
+            handleError(new RuntimeException(), lastReq);
+          } else {
+            handleError(e, lastReq);
+          }
+        }
+      }
+      outStream = null;
     }
   }
 
-  public synchronized void shutdown() {
-    for (ConcurrentUpdateHttp2SolrClient client : solrClients.values()) {
-      client.close();
+  public synchronized void finishStreams() {
+    //TODO nococmmit SOLR-13975
+    for (SingleStreamClient client : clients.values()) {
+      client.preFinish();
+    }
+    for (SingleStreamClient client : clients.values()) {
+      client.closeStream();
     }
   }
 
@@ -108,54 +194,4 @@ public class StreamingSolrClients {
     return httpClient;
   }
 
-  public ExecutorService getUpdateExecutor() {
-    return updateExecutor;
-  }
-}
-
-class ErrorReportingConcurrentUpdateSolrClient extends ConcurrentUpdateHttp2SolrClient {
-  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-  private final SolrCmdDistributor.Req req;
-  private final List<Error> errors;
-
-  public ErrorReportingConcurrentUpdateSolrClient(Builder builder) {
-    super(builder);
-    this.req = builder.req;
-    this.errors = builder.errors;
-  }
-
-  @Override
-  public void handleError(Throwable ex) {
-    log.error("Error when calling {} to {}", req, req.node.getUrl(), ex);
-    Error error = new Error();
-    error.e = (Exception) ex;
-    if (ex instanceof SolrException) {
-      error.statusCode = ((SolrException) ex).code();
-    }
-    error.req = req;
-    errors.add(error);
-    if (!req.shouldRetry(error)) {
-      // only track the error if we are not retrying the request
-      req.trackRequestResult(null, null, false);
-    }
-  }
-  @Override
-  public void onSuccess(Response resp, InputStream respBody) {
-    req.trackRequestResult(resp, respBody, true);
-  }
-
-  static class Builder extends ConcurrentUpdateHttp2SolrClient.Builder {
-    protected SolrCmdDistributor.Req req;
-    protected List<Error> errors;
-
-    public Builder(String baseSolrUrl, Http2SolrClient client, SolrCmdDistributor.Req req, List<Error> errors) {
-      super(baseSolrUrl, client);
-      this.req = req;
-      this.errors = errors;
-    }
-
-    public ErrorReportingConcurrentUpdateSolrClient build() {
-      return new ErrorReportingConcurrentUpdateSolrClient(this);
-    }
-  }
 }
diff --git a/solr/core/src/test/org/apache/solr/update/MockStreamingSolrClients.java b/solr/core/src/test/org/apache/solr/update/MockStreamingSolrClients.java
index bfa6221..a19ca75 100644
--- a/solr/core/src/test/org/apache/solr/update/MockStreamingSolrClients.java
+++ b/solr/core/src/test/org/apache/solr/update/MockStreamingSolrClients.java
@@ -23,6 +23,7 @@ import org.apache.lucene.util.LuceneTestCase;
 import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.Http2SolrClient;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.util.NamedList;
 
@@ -35,13 +36,13 @@ public class MockStreamingSolrClients extends StreamingSolrClients {
   public MockStreamingSolrClients(UpdateShardHandler updateShardHandler) {
     super(updateShardHandler);
   }
-  
+
   @Override
-  public synchronized SolrClient getSolrClient(final SolrCmdDistributor.Req req) {
-    SolrClient client = super.getSolrClient(req);
-    return new MockSolrClient(client);
+  public synchronized SingleStreamClient getClient(SolrCmdDistributor.Req req) {
+    SingleStreamClient client = super.getClient(req);
+    return new MockClient(client);
   }
-  
+
   public void setExp(Exp exp) {
     this.exp = exp;
   }
@@ -60,37 +61,41 @@ public class MockStreamingSolrClients extends StreamingSolrClients {
     return null;
   }
 
-  class MockSolrClient extends SolrClient {
+  class MockClient extends SingleStreamClient {
+    SingleStreamClient client;
 
-    private SolrClient solrClient;
-
-    public MockSolrClient(SolrClient solrClient) {
-      this.solrClient = solrClient;
+    public MockClient(SingleStreamClient client) {
+      super(null, null);
+      this.client = client;
     }
-    
+
     @Override
-    public NamedList<Object> request(@SuppressWarnings({"rawtypes"})SolrRequest request, String collection)
-        throws SolrServerException, IOException {
+    public void request(SolrCmdDistributor.Req req) {
       if (exp != null) {
         Exception e = exception();
         if (e instanceof IOException) {
           if (LuceneTestCase.random().nextBoolean()) {
-            throw (IOException)e;
+            this.client.handleError(e, req);
+            return;
           } else {
-            throw new SolrServerException(e);
+            this.client.handleError(e, req);
+            return;
           }
         } else if (e instanceof SolrServerException) {
-          throw (SolrServerException)e;
+          this.client.handleError(e, req);
+          return;
         } else {
-          throw new SolrServerException(e);
+          this.client.handleError(e, req);
+          return;
         }
       }
-      
-      return solrClient.request(request);
+      this.client.request(req);
     }
 
     @Override
-    public void close() {}
-    
+    public void finish() {
+      this.client.finish();
+    }
   }
+
 }
diff --git a/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java b/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java
index 77f5a89..c4bc9e4 100644
--- a/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java
+++ b/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java
@@ -19,6 +19,7 @@ package org.apache.solr.update;
 import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.IOException;
+import java.lang.invoke.MethodHandles;
 import java.net.SocketException;
 import java.nio.file.Path;
 import java.util.ArrayList;
@@ -61,11 +62,14 @@ import org.apache.solr.util.TestInjection;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.xml.sax.SAXException;
 
 // See: https://issues.apache.org/jira/browse/SOLR-12028 Tests cannot remove files on Windows machines occasionally
 public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
-  
+
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   private static enum NodeType {FORWARD, STANDARD};
   
   private AtomicInteger id = new AtomicInteger();
@@ -359,7 +363,6 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
     testDeletes(false, false);
     testDeletes(true, true);
     testDeletes(true, false);
-    getRfFromResponseShouldNotCloseTheInputStream();
     testStuckUpdates();
   }
   
@@ -540,22 +543,6 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
     assertFalse(req.shouldRetry(err));
   }
 
-  public void getRfFromResponseShouldNotCloseTheInputStream() {
-    UpdateRequest dbqReq = new UpdateRequest();
-    dbqReq.deleteByQuery("*:*");
-    SolrCmdDistributor.Req req = new SolrCmdDistributor.Req(null, new StdNode(null, "collection1", "shard1", 1), dbqReq, true);
-    AtomicBoolean isClosed = new AtomicBoolean(false);
-    ByteArrayInputStream is = new ByteArrayInputStream(new byte[100]) {
-      @Override
-      public void close() throws IOException {
-        isClosed.set(true);
-        super.close();
-      }
-    };
-    req.trackRequestResult(null, is, true);
-    assertFalse("Underlying stream should not be closed!", isClosed.get());
-  }
-  
   private void testReqShouldRetryMaxRetries() {
     Error err = getError(new SocketException()); 
     SolrCmdDistributor.Req req = new SolrCmdDistributor.Req(null, new StdNode(null, "collection1", "shard1", 1), new UpdateRequest(), true);
@@ -787,6 +774,7 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
   }
 
   private void testRetryNodeAgainstBadAddress() throws SolrServerException, IOException {
+    log.info("Datcm start test");
     // Test RetryNode
     try (SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(updateShardHandler)) {
       final HttpSolrClient solrclient = (HttpSolrClient) clients.get(0);
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
index 218a096..82788b0 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
@@ -266,14 +266,22 @@ public class Http2SolrClient extends SolrClient {
     private final OutputStreamContentProvider outProvider;
     private final InputStreamResponseListener responseListener;
     private final boolean isXml;
+    private final boolean isV2Api;
+    private final String destUri;
+    private boolean closed;
+    private long closedTime;
 
     public OutStream(String origCollection, ModifiableSolrParams origParams,
-                     OutputStreamContentProvider outProvider, InputStreamResponseListener responseListener, boolean isXml) {
+                     OutputStreamContentProvider outProvider, InputStreamResponseListener responseListener,
+                     String destUri,
+                     boolean isXml, boolean isV2Api) {
       this.origCollection = origCollection;
       this.origParams = origParams;
       this.outProvider = outProvider;
       this.responseListener = responseListener;
       this.isXml = isXml;
+      this.isV2Api = isV2Api;
+      this.destUri = destUri;
     }
 
     boolean belongToThisStream(@SuppressWarnings({"rawtypes"})SolrRequest solrRequest, String collection) {
@@ -294,10 +302,23 @@ public class Http2SolrClient extends SolrClient {
 
     @Override
     public void close() throws IOException {
+      if (closed) {
+        return;
+      }
       if (isXml) {
         write("</stream>".getBytes(FALLBACK_CHARSET));
       }
       this.outProvider.getOutputStream().close();
+      closed = true;
+      closedTime = System.nanoTime();
+    }
+
+    private long getIdleTimeout(long idleTimeout) {
+      long timePassed = (System.nanoTime() - closedTime) / 1000;
+      if (timePassed >= idleTimeout) {
+        return 0;
+      }
+      return idleTimeout - timePassed;
     }
 
     //TODO this class should be hidden
@@ -337,7 +358,8 @@ public class Http2SolrClient extends SolrClient {
 
     boolean isXml = ClientUtils.TEXT_XML.equals(requestWriter.getUpdateContentType());
     OutStream outStream = new OutStream(collection, origParams, provider, responseListener,
-        isXml);
+            postRequest.getURI().toString(),
+            isXml, isV2ApiRequest(updateRequest));
     if (isXml) {
       outStream.write("<stream>".getBytes(FALLBACK_CHARSET));
     }
@@ -380,6 +402,7 @@ public class Http2SolrClient extends SolrClient {
       asyncListener.onFailure(e);
       return FAILED_MAKING_REQUEST_CANCELLABLE;
     }
+    final boolean isV2Api = isV2ApiRequest(solrRequest);
     final ResponseParser parser = solrRequest.getResponseParser() == null
         ? this.parser: solrRequest.getResponseParser();
     req.onRequestQueued(asyncTracker.queuedListener)
@@ -393,7 +416,7 @@ public class Http2SolrClient extends SolrClient {
               InputStream is = listener.getInputStream();
               assert ObjectReleaseTracker.track(is);
               try {
-                NamedList<Object> body = processErrorsAndResponse(solrRequest, parser, response, is);
+                NamedList<Object> body = processErrorsAndResponse(parser, response, is, isV2Api);
                 asyncListener.onSuccess(body);
               } catch (RemoteSolrException e) {
                 if (SolrException.getRootCause(e) != CANCELLED_EXCEPTION) {
@@ -416,43 +439,60 @@ public class Http2SolrClient extends SolrClient {
     return () -> req.abort(CANCELLED_EXCEPTION);
   }
 
-  @Override
-  public NamedList<Object> request(@SuppressWarnings({"rawtypes"}) SolrRequest solrRequest, String collection) throws SolrServerException, IOException {
-    Request req = makeRequest(solrRequest, collection);
-    final ResponseParser parser = solrRequest.getResponseParser() == null
-        ? this.parser: solrRequest.getResponseParser();
+  public void closeStream(OutStream outStream) throws IOException {
+    outStream.close();
+  }
+
+  public NamedList<Object> closeAndGetResponse(OutStream outStream) throws IOException, SolrServerException {
+    outStream.close();
+    return getResponseFrom(outStream.responseListener, outStream.destUri, outStream.isV2Api, null, outStream.getIdleTimeout(idleTimeout));
+  }
 
+  private NamedList<Object> getResponseFrom(InputStreamResponseListener listener,
+                                            String destUri, boolean isV2Api,
+                                            ResponseParser parser, long idleTimeout) throws SolrServerException, IOException {
     try {
-      InputStreamResponseListener listener = new InputStreamResponseListener();
-      req.send(listener);
       Response response = listener.get(idleTimeout, TimeUnit.MILLISECONDS);
       InputStream is = listener.getInputStream();
       assert ObjectReleaseTracker.track(is);
-
-      return processErrorsAndResponse(solrRequest, parser, response, is);
+      if (parser == null) {
+        parser = this.parser;
+      }
+      return processErrorsAndResponse(parser, response, is, isV2Api);
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
       throw new RuntimeException(e);
     } catch (TimeoutException e) {
       throw new SolrServerException(
-          "Timeout occured while waiting response from server at: " + req.getURI(), e);
+              "Timeout occured while waiting response from server at: " + destUri, e);
     } catch (ExecutionException e) {
       Throwable cause = e.getCause();
       if (cause instanceof ConnectException) {
-        throw new SolrServerException("Server refused connection at: " + req.getURI(), cause);
+        throw new SolrServerException("Server refused connection at: " + destUri, cause);
       }
       if (cause instanceof SolrServerException) {
         throw (SolrServerException) cause;
       } else if (cause instanceof IOException) {
         throw new SolrServerException(
-            "IOException occured when talking to server at: " + getBaseURL(), cause);
+                "IOException occured when talking to server at: " + destUri, cause);
       }
       throw new SolrServerException(cause.getMessage(), cause);
     }
   }
 
-  private NamedList<Object> processErrorsAndResponse(@SuppressWarnings({"rawtypes"})SolrRequest solrRequest,
-                                                     ResponseParser parser, Response response, InputStream is) throws SolrServerException {
+  @Override
+  public NamedList<Object> request(@SuppressWarnings({"rawtypes"}) SolrRequest solrRequest, String collection) throws SolrServerException, IOException {
+    Request req = makeRequest(solrRequest, collection);
+    final ResponseParser parser = solrRequest.getResponseParser() == null
+        ? this.parser: solrRequest.getResponseParser();
+
+    InputStreamResponseListener listener = new InputStreamResponseListener();
+    req.send(listener);
+    return getResponseFrom(listener, req.getURI().toString(), isV2ApiRequest(solrRequest), parser, idleTimeout);
+  }
+
+  private NamedList<Object> processErrorsAndResponse(ResponseParser parser, Response response,
+                                                     InputStream is, boolean isV2Api) throws SolrServerException {
     ContentType contentType = getContentType(response);
     String mimeType = null;
     String encoding = null;
@@ -460,7 +500,7 @@ public class Http2SolrClient extends SolrClient {
       mimeType = contentType.getMimeType();
       encoding = contentType.getCharset() != null? contentType.getCharset().name() : null;
     }
-    return processErrorsAndResponse(response, parser, is, mimeType, encoding, isV2ApiRequest(solrRequest));
+    return processErrorsAndResponse(response, parser, is, mimeType, encoding, isV2Api);
   }
 
   private ContentType getContentType(Response response) {