You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by da...@apache.org on 2018/08/10 09:13:43 UTC

[03/31] lucene-solr:jira/http2: SOLR-11881: Retry update requests from leaders to followers

SOLR-11881: Retry update requests from leaders to followers


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/c338cf61
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/c338cf61
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/c338cf61

Branch: refs/heads/jira/http2
Commit: c338cf61e7baba4908c31e02beda47ae3e201752
Parents: ea2e564
Author: Tomas Fernandez Lobbe <tf...@apache.org>
Authored: Mon Aug 6 15:56:49 2018 -0700
Committer: Tomas Fernandez Lobbe <tf...@apache.org>
Committed: Mon Aug 6 15:56:49 2018 -0700

----------------------------------------------------------------------
 solr/CHANGES.txt                                |   2 +
 .../apache/solr/update/SolrCmdDistributor.java  | 230 ++++++++----
 .../solr/update/StreamingSolrClients.java       |   5 +-
 .../processor/DistributedUpdateProcessor.java   |  36 +-
 .../TimeRoutedAliasUpdateProcessor.java         |   4 +-
 .../solr/update/MockStreamingSolrClients.java   |  27 +-
 .../solr/update/SolrCmdDistributorTest.java     | 370 +++++++++++++++++--
 7 files changed, 540 insertions(+), 134 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c338cf61/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 605e837..5fb1f88 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -233,6 +233,8 @@ Optimizations
   introduced (splitMethod=link) which uses hard-linking of index files when possible, resulting in
   significant speedups and reduced CPU / IO load on shard leader. (ab)
 
+* SOLR-11881: Retry update requests sent by leaders to it's followers (Varun Thacker, Mark Miller, Tomás Fernández Löbbe)
+
 Other Changes
 ----------------------
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c338cf61/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
index 80e2253..d5aafec 100644
--- a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
+++ b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
@@ -17,12 +17,28 @@
 package org.apache.solr.update;
 
 
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.invoke.MethodHandles;
+import java.net.ConnectException;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Future;
 import org.apache.http.HttpResponse;
+import org.apache.http.NoHttpResponseException;
 import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.impl.BinaryResponseParser;
-import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.client.solrj.impl.ConcurrentUpdateSolrClient; // jdoc
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
 import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.common.SolrException;
@@ -32,37 +48,21 @@ import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.core.Diagnostics;
 import org.apache.solr.update.processor.DistributedUpdateProcessor;
-import org.apache.solr.update.processor.DistributedUpdateProcessor.RollupRequestReplicationTracker;
 import org.apache.solr.update.processor.DistributedUpdateProcessor.LeaderRequestReplicationTracker;
+import org.apache.solr.update.processor.DistributedUpdateProcessor.RollupRequestReplicationTracker;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.InputStream;
-import java.lang.invoke.MethodHandles;
-import java.net.ConnectException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CompletionService;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.Future;
-
 /**
  * Used for distributing commands from a shard leader to its replicas.
  */
 public class SolrCmdDistributor implements Closeable {
-  private static final int MAX_RETRIES_ON_FORWARD = 25;
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   
   private StreamingSolrClients clients;
   private boolean finished = false; // see finish()
 
   private int retryPause = 500;
-  private int maxRetriesOnForward = MAX_RETRIES_ON_FORWARD;
   
   private final List<Error> allErrors = new ArrayList<>();
   private final List<Error> errors = Collections.synchronizedList(new ArrayList<Error>());
@@ -79,9 +79,8 @@ public class SolrCmdDistributor implements Closeable {
     this.completionService = new ExecutorCompletionService<>(updateShardHandler.getUpdateExecutor());
   }
   
-  public SolrCmdDistributor(StreamingSolrClients clients, int maxRetriesOnForward, int retryPause) {
+  public SolrCmdDistributor(StreamingSolrClients clients, int retryPause) {
     this.clients = clients;
-    this.maxRetriesOnForward = maxRetriesOnForward;
     this.retryPause = retryPause;
     completionService = new ExecutorCompletionService<>(clients.getUpdateExecutor());
   }
@@ -107,55 +106,63 @@ public class SolrCmdDistributor implements Closeable {
     List<Error> errors = new ArrayList<>(this.errors);
     errors.addAll(clients.getErrors());
     List<Error> resubmitList = new ArrayList<>();
+    
+    if (log.isInfoEnabled() && errors.size() > 0) {
+      log.info("SolrCmdDistributor found {} errors", errors.size());
+    }
+    
+    if (log.isDebugEnabled() && errors.size() > 0) {
+      StringBuilder builder = new StringBuilder("SolrCmdDistributor found:");
+      int maxErrorsToShow = 10;
+      for (Error e:errors) {
+        if (maxErrorsToShow-- <= 0) break;
+        builder.append("\n" + e);
+      }
+      if (errors.size() > 10) {
+        builder.append("\n... and ");
+        builder.append(errors.size() - 10);
+        builder.append(" more");
+      }
+      log.debug(builder.toString());
+    }
 
     for (Error err : errors) {
       try {
         String oldNodeUrl = err.req.node.getUrl();
         
-        // if there is a retry url, we want to retry...
-        boolean isRetry = err.req.node.checkRetry();
-        
-        boolean doRetry = false;
-        int rspCode = err.statusCode;
+        /*
+         * if this is a retryable request we may want to retry, depending on the error we received and
+         * the number of times we have already retried
+         */
+        boolean isRetry = err.req.shouldRetry(err);
         
         if (testing_errorHook != null) Diagnostics.call(testing_errorHook,
             err.e);
         
         // this can happen in certain situations such as close
         if (isRetry) {
-          if (rspCode == 404 || rspCode == 403 || rspCode == 503) {
-            doRetry = true;
-          }
-          
-          // if it's a connect exception, lets try again
-          if (err.e instanceof SolrServerException) {
-            if (((SolrServerException) err.e).getRootCause() instanceof ConnectException) {
-              doRetry = true;
-            }
-          }
-          
-          if (err.e instanceof ConnectException) {
-            doRetry = true;
-          }
-          
-          if (err.req.retries < maxRetriesOnForward && doRetry) {
-            err.req.retries++;
-            
+          err.req.retries++;
+
+          if (err.req.node instanceof ForwardNode) {
             SolrException.log(SolrCmdDistributor.log, "forwarding update to "
                 + oldNodeUrl + " failed - retrying ... retries: "
-                + err.req.retries + " " + err.req.cmd.toString() + " params:"
-                + err.req.uReq.getParams() + " rsp:" + rspCode, err.e);
-            try {
-              Thread.sleep(retryPause);
-            } catch (InterruptedException e) {
-              Thread.currentThread().interrupt();
-              log.warn(null, e);
-            }
-            
-            resubmitList.add(err);
+                + err.req.retries + "/" + err.req.node.getMaxRetries() + ". "
+                + err.req.cmd.toString() + " params:"
+                + err.req.uReq.getParams() + " rsp:" + err.statusCode, err.e);
           } else {
-            allErrors.add(err);
+            SolrException.log(SolrCmdDistributor.log, "FROMLEADER request to "
+                + oldNodeUrl + " failed - retrying ... retries: "
+                + err.req.retries + "/" + err.req.node.getMaxRetries() + ". "
+                + err.req.cmd.toString() + " params:"
+                + err.req.uReq.getParams() + " rsp:" + err.statusCode, err.e);
+          }
+          try {
+            Thread.sleep(retryPause); //TODO: Do we want this wait for every error?
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            log.warn(null, e);
           }
+          resubmitList.add(err);
         } else {
           allErrors.add(err);
         }
@@ -184,6 +191,10 @@ public class SolrCmdDistributor implements Closeable {
                             RollupRequestReplicationTracker rollupTracker,
                             LeaderRequestReplicationTracker leaderTracker) throws IOException {
     
+    if (!cmd.isDeleteById()) {
+      blockAndDoRetries(); // For DBQ, flush all writes before submitting
+    }
+    
     for (Node node : nodes) {
       UpdateRequest uReq = new UpdateRequest();
       uReq.setParams(params);
@@ -193,7 +204,6 @@ public class SolrCmdDistributor implements Closeable {
       } else {
         uReq.deleteByQuery(cmd.query);
       }
-
       submit(new Req(cmd, node, uReq, sync, rollupTracker, leaderTracker), false);
     }
   }
@@ -346,6 +356,16 @@ public class SolrCmdDistributor implements Closeable {
       this.leaderTracker = leaderTracker;
     }
     
+    /**
+     * @return true if this request should be retried after receiving a particular error
+     *         false otherwise
+     */
+    public boolean shouldRetry(Error err) {
+      boolean isRetry = node.checkRetry(err);
+      isRetry &= uReq.getDeleteQuery() == null || uReq.getDeleteQuery().isEmpty(); //Don't retry DBQs 
+      return isRetry && retries < node.getMaxRetries();
+    }
+    
     public String toString() {
       StringBuilder sb = new StringBuilder();
       sb.append("SolrCmdDistributor$Req: cmd=").append(cmd.toString());
@@ -440,27 +460,36 @@ public class SolrCmdDistributor implements Closeable {
   
   public static abstract class Node {
     public abstract String getUrl();
-    public abstract boolean checkRetry();
+    public abstract boolean checkRetry(Error e);
     public abstract String getCoreName();
     public abstract String getBaseUrl();
     public abstract ZkCoreNodeProps getNodeProps();
     public abstract String getCollection();
     public abstract String getShardId();
+    public abstract int getMaxRetries();
   }
 
   public static class StdNode extends Node {
     protected ZkCoreNodeProps nodeProps;
     protected String collection;
     protected String shardId;
+    private final boolean retry;
+    private final int maxRetries;
 
     public StdNode(ZkCoreNodeProps nodeProps) {
-      this(nodeProps, null, null);
+      this(nodeProps, null, null, 0);
     }
     
-    public StdNode(ZkCoreNodeProps nodeProps, String collection, String shardId) {    
+    public StdNode(ZkCoreNodeProps nodeProps, String collection, String shardId) {
+      this(nodeProps, collection, shardId, 0);
+    }
+    
+    public StdNode(ZkCoreNodeProps nodeProps, String collection, String shardId, int maxRetries) {
       this.nodeProps = nodeProps;
       this.collection = collection;
       this.shardId = shardId;
+      this.retry = maxRetries > 0;
+      this.maxRetries = maxRetries;
     }
     
     public String getCollection() {
@@ -482,9 +511,33 @@ public class SolrCmdDistributor implements Closeable {
     }
 
     @Override
-    public boolean checkRetry() {
+    public boolean checkRetry(Error err) {
+      if (!retry) return false;
+      
+      if (err.statusCode == 404 || err.statusCode == 403 || err.statusCode == 503) {
+        return true;
+      }
+      
+      // if it's a connect exception, lets try again
+      if (err.e instanceof SolrServerException) {
+        if (isRetriableException(((SolrServerException) err.e).getRootCause())) {
+          return true;
+        }
+      } else {
+        if (isRetriableException(err.e)) {
+          return true;
+        }
+      }
       return false;
     }
+    
+    /**
+     * @return true if Solr should retry in case of hitting this exception
+     *         false otherwise
+     */
+    private boolean isRetriableException(Throwable t) {
+      return t instanceof SocketException || t instanceof NoHttpResponseException || t instanceof SocketTimeoutException;
+    }
 
     @Override
     public String getBaseUrl() {
@@ -506,6 +559,8 @@ public class SolrCmdDistributor implements Closeable {
       result = prime * result + ((baseUrl == null) ? 0 : baseUrl.hashCode());
       result = prime * result + ((coreName == null) ? 0 : coreName.hashCode());
       result = prime * result + ((url == null) ? 0 : url.hashCode());
+      result = prime * result + Boolean.hashCode(retry);
+      result = prime * result + Integer.hashCode(maxRetries);
       return result;
     }
 
@@ -515,6 +570,8 @@ public class SolrCmdDistributor implements Closeable {
       if (obj == null) return false;
       if (getClass() != obj.getClass()) return false;
       StdNode other = (StdNode) obj;
+      if (this.retry != other.retry) return false;
+      if (this.maxRetries != other.maxRetries) return false;
       String baseUrl = nodeProps.getBaseUrl();
       String coreName = nodeProps.getCoreName();
       String url = nodeProps.getCoreUrl();
@@ -534,39 +591,56 @@ public class SolrCmdDistributor implements Closeable {
     public ZkCoreNodeProps getNodeProps() {
       return nodeProps;
     }
+
+    @Override
+    public int getMaxRetries() {
+      return this.maxRetries;
+    }
   }
   
   // RetryNodes are used in the case of 'forward to leader' where we want
   // to try the latest leader on a fail in the case the leader just went down.
-  public static class RetryNode extends StdNode {
+  public static class ForwardNode extends StdNode {
     
     private ZkStateReader zkStateReader;
     
-    public RetryNode(ZkCoreNodeProps nodeProps, ZkStateReader zkStateReader, String collection, String shardId) {
-      super(nodeProps, collection, shardId);
+    public ForwardNode(ZkCoreNodeProps nodeProps, ZkStateReader zkStateReader, String collection, String shardId, int maxRetries) {
+      super(nodeProps, collection, shardId, maxRetries);
       this.zkStateReader = zkStateReader;
       this.collection = collection;
       this.shardId = shardId;
     }
 
     @Override
-    public boolean checkRetry() {
-      ZkCoreNodeProps leaderProps;
-      try {
-        leaderProps = new ZkCoreNodeProps(zkStateReader.getLeaderRetry(
-            collection, shardId));
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        return false;
-      } catch (Exception e) {
-        // we retry with same info
-        log.warn(null, e);
-        return true;
+    public boolean checkRetry(Error err) {
+      boolean doRetry = false;
+      if (err.statusCode == 404 || err.statusCode == 403 || err.statusCode == 503) {
+        doRetry = true;
       }
-     
-      this.nodeProps = leaderProps;
       
-      return true;
+      // if it's a connect exception, lets try again
+      if (err.e instanceof SolrServerException && ((SolrServerException) err.e).getRootCause() instanceof ConnectException) {
+        doRetry = true;
+      } else if (err.e instanceof ConnectException) {
+        doRetry = true;
+      }
+      if (doRetry) {
+        ZkCoreNodeProps leaderProps;
+        try {
+          leaderProps = new ZkCoreNodeProps(zkStateReader.getLeaderRetry(
+              collection, shardId));
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          return false;
+        } catch (Exception e) {
+          // we retry with same info
+          log.warn(null, e);
+          return true;
+        }
+       
+        this.nodeProps = leaderProps;
+      }
+      return doRetry;
     }
 
     @Override
@@ -584,7 +658,7 @@ public class SolrCmdDistributor implements Closeable {
       if (this == obj) return true;
       if (!super.equals(obj)) return false;
       if (getClass() != obj.getClass()) return false;
-      RetryNode other = (RetryNode) obj;
+      ForwardNode other = (ForwardNode) obj;
       if (nodeProps.getCoreUrl() == null) {
         if (other.nodeProps.getCoreUrl() != null) return false;
       } else if (!nodeProps.getCoreUrl().equals(other.nodeProps.getCoreUrl())) return false;

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c338cf61/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java b/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java
index eb4caec..eb92540 100644
--- a/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java
+++ b/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java
@@ -143,7 +143,6 @@ class ErrorReportingConcurrentUpdateSolrClient extends ConcurrentUpdateSolrClien
   
   @Override
   public void handleError(Throwable ex) {
-    req.trackRequestResult(null, false);
     log.error("error", ex);
     Error error = new Error();
     error.e = (Exception) ex;
@@ -152,6 +151,10 @@ class ErrorReportingConcurrentUpdateSolrClient extends ConcurrentUpdateSolrClien
     }
     error.req = req;
     errors.add(error);
+    if (!req.shouldRetry(error)) {
+      // only track the error if we are not retrying the request
+      req.trackRequestResult(null, false);
+    }
   }
   @Override
   public void onSuccess(HttpResponse resp) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c338cf61/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
index a21d906..e1e7968 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
@@ -83,7 +83,7 @@ import org.apache.solr.update.DeleteUpdateCommand;
 import org.apache.solr.update.SolrCmdDistributor;
 import org.apache.solr.update.SolrCmdDistributor.Error;
 import org.apache.solr.update.SolrCmdDistributor.Node;
-import org.apache.solr.update.SolrCmdDistributor.RetryNode;
+import org.apache.solr.update.SolrCmdDistributor.ForwardNode;
 import org.apache.solr.update.SolrCmdDistributor.StdNode;
 import org.apache.solr.update.SolrIndexSplitter;
 import org.apache.solr.update.UpdateCommand;
@@ -114,6 +114,16 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   /**
+   * Request forwarded to a leader of a different shard will be retried up to this amount of times by default
+   */
+  static final int MAX_RETRIES_ON_FORWARD_DEAULT = 25;
+  
+  /**
+   * Requests from leader to it's followers will be retried this amount of times by default
+   */
+  static final int MAX_RETRIES_TO_FOLLOWERS_DEFAULT = 3;
+
+  /**
    * Values this processor supports for the <code>DISTRIB_UPDATE_PARAM</code>.
    * This is an implementation detail exposed solely for tests.
    * 
@@ -175,6 +185,15 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
   private List<Node> nodes;
   private Set<String> skippedCoreNodeNames;
   private boolean isIndexChanged = false;
+  
+  /**
+   * Number of times requests forwarded to some other shard's leader can be retried
+   */
+  private final int maxRetriesOnForward = MAX_RETRIES_ON_FORWARD_DEAULT;
+  /**
+   * Number of times requests from leaders to followers can be retried
+   */
+  private final int maxRetriesToFollowers = MAX_RETRIES_TO_FOLLOWERS_DEFAULT;
 
   private UpdateCommand updateCommand;  // the current command this processor is working on.
     
@@ -368,7 +387,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
           } else if (!clusterState.getLiveNodes().contains(replica.getNodeName()) || replica.getState() == Replica.State.DOWN) {
             skippedCoreNodeNames.add(replica.getName());
           } else {
-            nodes.add(new StdNode(new ZkCoreNodeProps(replica), collection, shardId));
+            nodes.add(new StdNode(new ZkCoreNodeProps(replica), collection, shardId, maxRetriesToFollowers));
           }
         }
         return nodes;
@@ -377,7 +396,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
         // I need to forward on to the leader...
         forwardToLeader = true;
         return Collections.singletonList(
-            new RetryNode(new ZkCoreNodeProps(leaderReplica), zkController.getZkStateReader(), collection, shardId));
+            new ForwardNode(new ZkCoreNodeProps(leaderReplica), zkController.getZkStateReader(), collection, shardId, maxRetriesOnForward));
       }
 
     } catch (InterruptedException e) {
@@ -779,7 +798,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
     Set<String> replicasShouldBeInLowerTerms = new HashSet<>();
     for (final SolrCmdDistributor.Error error : errors) {
       
-      if (error.req.node instanceof RetryNode) {
+      if (error.req.node instanceof ForwardNode) {
         // if it's a forward, any fail is a problem - 
         // otherwise we assume things are fine if we got it locally
         // until we start allowing min replication param
@@ -1538,7 +1557,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
           // don't forward to ourself
           leaderForAnyShard = true;
         } else {
-          leaders.add(new RetryNode(coreLeaderProps, zkController.getZkStateReader(), collection, sliceName));
+          leaders.add(new ForwardNode(coreLeaderProps, zkController.getZkStateReader(), collection, sliceName, maxRetriesOnForward));
         }
       }
 
@@ -2100,7 +2119,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
     private int achievedRf = Integer.MAX_VALUE;
     private final int requestedRf;
 
-    RollupRequestReplicationTracker(String minRepFact) {
+    public RollupRequestReplicationTracker(String minRepFact) {
       try {
         this.requestedRf = Integer.parseInt(minRepFact);
       } catch (NumberFormatException nfe) {
@@ -2153,7 +2172,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
       return requestedRf;
     }
 
-    LeaderRequestReplicationTracker(String shardId, int requestedRf) {
+    public LeaderRequestReplicationTracker(String shardId, int requestedRf) {
       this.requestedRf = requestedRf;
       this.myShardId = shardId;
     }
@@ -2165,8 +2184,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
 
     public void trackRequestResult(Node node, boolean success) {
       if (log.isDebugEnabled()) {
-        log.debug("trackRequestResult(" + node + "): success? " + success +
-            ", shardId=" + myShardId);
+        log.debug("trackRequestResult({}): success? {}, shardId={}", node, success, myShardId);
       }
 
       if (success) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c338cf61/solr/core/src/java/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessor.java
index 1d2d730..cd4ed00 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessor.java
@@ -401,8 +401,8 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor {
       throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE,
           "No 'leader' replica available for shard " + slice.getName() + " of collection " + collection);
     }
-    return new SolrCmdDistributor.RetryNode(new ZkCoreNodeProps(leader), zkController.getZkStateReader(),
-        collection, slice.getName());
+    return new SolrCmdDistributor.ForwardNode(new ZkCoreNodeProps(leader), zkController.getZkStateReader(),
+        collection, slice.getName(), DistributedUpdateProcessor.MAX_RETRIES_ON_FORWARD_DEAULT);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c338cf61/solr/core/src/test/org/apache/solr/update/MockStreamingSolrClients.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/update/MockStreamingSolrClients.java b/solr/core/src/test/org/apache/solr/update/MockStreamingSolrClients.java
index 72d39ff..c269c9e 100644
--- a/solr/core/src/test/org/apache/solr/update/MockStreamingSolrClients.java
+++ b/solr/core/src/test/org/apache/solr/update/MockStreamingSolrClients.java
@@ -16,19 +16,19 @@
  */
 package org.apache.solr.update;
 
+import java.io.IOException;
+import java.net.ConnectException;
+import java.net.SocketException;
 import org.apache.lucene.util.LuceneTestCase;
 import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.common.SolrException;
 import org.apache.solr.common.util.NamedList;
 
-import java.io.IOException;
-import java.net.ConnectException;
-import java.net.SocketException;
-
 public class MockStreamingSolrClients extends StreamingSolrClients {
   
-  public enum Exp {CONNECT_EXCEPTION, SOCKET_EXCEPTION};
+  public enum Exp {CONNECT_EXCEPTION, SOCKET_EXCEPTION, BAD_REQUEST};
   
   private volatile Exp exp = null;
   
@@ -46,12 +46,14 @@ public class MockStreamingSolrClients extends StreamingSolrClients {
     this.exp = exp;
   }
 
-  private IOException exception() {
+  private Exception exception() {
     switch (exp) {
       case CONNECT_EXCEPTION:
         return new ConnectException();
       case SOCKET_EXCEPTION:
         return new SocketException();
+      case BAD_REQUEST:
+        return new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Bad Request");
       default:
         break;
     }
@@ -70,10 +72,17 @@ public class MockStreamingSolrClients extends StreamingSolrClients {
     public NamedList<Object> request(SolrRequest request, String collection)
         throws SolrServerException, IOException {
       if (exp != null) {
-        if (LuceneTestCase.random().nextBoolean()) {
-          throw exception();
+        Exception e = exception();
+        if (e instanceof IOException) {
+          if (LuceneTestCase.random().nextBoolean()) {
+            throw (IOException)e;
+          } else {
+            throw new SolrServerException(e);
+          }
+        } else if (e instanceof SolrServerException) {
+          throw (SolrServerException)e;
         } else {
-          throw new SolrServerException(exception());
+          throw new SolrServerException(e);
         }
       }
       

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c338cf61/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java b/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java
index 1699b0d..24cf717 100644
--- a/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java
+++ b/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java
@@ -16,15 +16,15 @@
  */
 package org.apache.solr.update;
 
-import javax.xml.parsers.ParserConfigurationException;
 import java.io.File;
 import java.io.IOException;
+import java.net.SocketException;
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
-
+import javax.xml.parsers.ParserConfigurationException;
 import org.apache.solr.BaseDistributedSearchTestCase;
 import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.SolrQuery;
@@ -34,6 +34,7 @@ import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.client.solrj.request.LukeRequest;
 import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.common.SolrDocumentList;
+import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.ZkCoreNodeProps;
 import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
@@ -47,10 +48,12 @@ import org.apache.solr.index.LogDocMergePolicyFactory;
 import org.apache.solr.search.SolrIndexSearcher;
 import org.apache.solr.update.MockStreamingSolrClients.Exp;
 import org.apache.solr.update.SolrCmdDistributor.Error;
+import org.apache.solr.update.SolrCmdDistributor.ForwardNode;
 import org.apache.solr.update.SolrCmdDistributor.Node;
-import org.apache.solr.update.SolrCmdDistributor.RetryNode;
 import org.apache.solr.update.SolrCmdDistributor.StdNode;
 import org.apache.solr.update.processor.DistributedUpdateProcessor;
+import org.apache.solr.update.processor.DistributedUpdateProcessor.LeaderRequestReplicationTracker;
+import org.apache.solr.update.processor.DistributedUpdateProcessor.RollupRequestReplicationTracker;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -59,6 +62,8 @@ import org.xml.sax.SAXException;
 // See: https://issues.apache.org/jira/browse/SOLR-12028 Tests cannot remove files on Windows machines occasionally
 public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
   
+  private static enum NodeType {FORWARD, STANDARD};
+  
   private AtomicInteger id = new AtomicInteger();
   
   @BeforeClass
@@ -127,6 +132,7 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
     shards = sb.toString();
   }
 
+  @SuppressWarnings("unchecked")
   @Test
   @ShardsFixed(num = 4)
   public void test() throws Exception {
@@ -325,31 +331,177 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
       }
     }
     
-    testMaxRetries();
-    testOneRetry();
+    testMaxRetries(NodeType.FORWARD);
+    testMaxRetries(NodeType.STANDARD);
+    testOneRetry(NodeType.FORWARD);
+    testOneRetry(NodeType.STANDARD);
     testRetryNodeAgainstBadAddress();
-    testRetryNodeWontRetrySocketError();
-    
+    testStdNodeRetriesSocketError();
+    testForwardNodeWontRetrySocketError();
+    testNodeWontRetryBadRequest(NodeType.FORWARD);
+    testNodeWontRetryBadRequest(NodeType.STANDARD);
+    testMinRfOnRetries(NodeType.FORWARD);
+    testMinRfOnRetries(NodeType.STANDARD);
     testDistribOpenSearcher();
+    testReqShouldRetryNoRetries();
+    testReqShouldRetryMaxRetries();
+    testReqShouldRetryBadRequest();
+    testReqShouldRetryNotFound();
+    testReqShouldRetryDBQ();
+    testDeletes(false, true);
+    testDeletes(false, false);
+    testDeletes(true, true);
+    testDeletes(true, false);
   }
-
-  private void testMaxRetries() throws IOException {
+  
+  private void testDeletes(boolean dbq, boolean withFailures) throws Exception {
+    final HttpSolrClient solrclient = (HttpSolrClient) clients.get(0);
+    solrclient.commit(true, true);
+    long numFoundBefore = solrclient.query(new SolrQuery("*:*")).getResults()
+        .getNumFound();
     final MockStreamingSolrClients streamingClients = new MockStreamingSolrClients(updateShardHandler);
-    try (SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(streamingClients, 5, 0)) {
-      streamingClients.setExp(Exp.CONNECT_EXCEPTION);
+    try (SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(streamingClients, 0)) {
+      if (withFailures) {
+        streamingClients.setExp(Exp.CONNECT_EXCEPTION);
+      }
       ArrayList<Node> nodes = new ArrayList<>();
-      final HttpSolrClient solrclient1 = (HttpSolrClient) clients.get(0);
+
+      ZkNodeProps nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, solrclient.getBaseURL(),
+          ZkStateReader.CORE_NAME_PROP, "");
 
       final AtomicInteger retries = new AtomicInteger();
-      ZkNodeProps nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, solrclient1.getBaseURL(), ZkStateReader.CORE_NAME_PROP, "");
-      RetryNode retryNode = new RetryNode(new ZkCoreNodeProps(nodeProps), null, "collection1", "shard1") {
+      nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, solrclient.getBaseURL(), ZkStateReader.CORE_NAME_PROP, "");
+      Node retryNode = new StdNode(new ZkCoreNodeProps(nodeProps), "collection1", "shard1", 5) {
         @Override
-        public boolean checkRetry() {
+        public boolean checkRetry(Error err) {
+          streamingClients.setExp(null);
           retries.incrementAndGet();
-          return true;
+          return super.checkRetry(err);
         }
       };
 
+
+      nodes.add(retryNode);
+
+      for (int i = 0 ; i < 5 ; i++) {
+        AddUpdateCommand cmd = new AddUpdateCommand(null);
+        int currentId = id.incrementAndGet();
+        cmd.solrDoc = sdoc("id", currentId);
+        ModifiableSolrParams params = new ModifiableSolrParams();
+        cmdDistrib.distribAdd(cmd, nodes, params);
+        DeleteUpdateCommand dcmd = new DeleteUpdateCommand(null);
+        if (dbq) {
+          dcmd.setQuery("id:" + currentId);
+        } else {
+          dcmd.setId(String.valueOf(currentId));
+        }
+        cmdDistrib.distribDelete(dcmd, nodes, params, false, null, null);
+      }
+      
+
+      CommitUpdateCommand ccmd = new CommitUpdateCommand(null, false);
+      cmdDistrib.distribCommit(ccmd, nodes, new ModifiableSolrParams());
+      cmdDistrib.finish();
+      
+      int expectedRetryCount = 0;
+      if (withFailures) {
+        if (dbq) {
+          expectedRetryCount = 1; // just the first cmd would be retried
+        } else {
+          expectedRetryCount = 10;
+        }
+      }
+      assertEquals(expectedRetryCount, retries.get());
+
+
+      long numFoundAfter = solrclient.query(new SolrQuery("*:*")).getResults()
+          .getNumFound();
+
+      // we will get java.net.ConnectException which we retry on
+      assertEquals(numFoundBefore, numFoundAfter);
+      assertEquals(0, cmdDistrib.getErrors().size());
+    }
+  }
+
+  private void testMinRfOnRetries(NodeType nodeType) throws Exception {
+    final HttpSolrClient solrclient = (HttpSolrClient) clients.get(0);
+    final MockStreamingSolrClients streamingClients = new MockStreamingSolrClients(updateShardHandler);
+    try (SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(streamingClients, 0)) {
+      streamingClients.setExp(Exp.CONNECT_EXCEPTION);
+      ArrayList<Node> nodes = new ArrayList<>();
+
+      ZkNodeProps nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, solrclient.getBaseURL(),
+          ZkStateReader.CORE_NAME_PROP, "");
+
+      final AtomicInteger retries = new AtomicInteger();
+      nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, solrclient.getBaseURL(), ZkStateReader.CORE_NAME_PROP, "");
+      if (nodeType == NodeType.FORWARD) {
+        nodes.add(new ForwardNode(new ZkCoreNodeProps(nodeProps), null, "collection1", "shard1", 5) {
+          @Override
+          public boolean checkRetry(Error err) {
+            if (retries.incrementAndGet() >= 3) {
+              streamingClients.setExp(null);
+            }
+            return super.checkRetry(err);
+          }
+        });
+      } else {
+        nodes.add(new StdNode(new ZkCoreNodeProps(nodeProps), "collection1", "shard1", 5) {
+          @Override
+          public boolean checkRetry(Error err) {
+            if (retries.incrementAndGet() >= 3) {
+              streamingClients.setExp(null);
+            }
+            return super.checkRetry(err);
+          }
+        });
+      }
+
+
+      AddUpdateCommand cmd = new AddUpdateCommand(null);
+      cmd.solrDoc = sdoc("id", id.incrementAndGet());
+      ModifiableSolrParams params = new ModifiableSolrParams();
+      RollupRequestReplicationTracker rollupReqTracker = new RollupRequestReplicationTracker("2");
+      LeaderRequestReplicationTracker leaderReqTracker = new LeaderRequestReplicationTracker("shard1", 2);
+
+      cmdDistrib.distribAdd(cmd, nodes, params, false, rollupReqTracker, leaderReqTracker);
+      cmdDistrib.finish();
+      assertEquals(3, retries.get());
+      assertEquals(2, leaderReqTracker.getAchievedRf());// "2" here is because one would be the leader, that creates the instance of LeaderRequestReplicationTracker, the second one is the node
+
+      assertEquals(0, cmdDistrib.getErrors().size());
+    }
+  }
+
+  private void testMaxRetries(NodeType nodeType) throws IOException {
+    final MockStreamingSolrClients streamingClients = new MockStreamingSolrClients(updateShardHandler);
+    try (SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(streamingClients, 0)) {
+      streamingClients.setExp(Exp.CONNECT_EXCEPTION);
+      ArrayList<Node> nodes = new ArrayList<>();
+      final HttpSolrClient solrclient1 = (HttpSolrClient) clients.get(0);
+
+      final AtomicInteger retries = new AtomicInteger();
+      ZkNodeProps nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, solrclient1.getBaseURL(), ZkStateReader.CORE_NAME_PROP, "");
+      Node retryNode;
+      if (nodeType == NodeType.FORWARD) {
+        retryNode = new ForwardNode(new ZkCoreNodeProps(nodeProps), null, "collection1", "shard1", 6) {
+          @Override
+          public boolean checkRetry(Error err) {
+            retries.incrementAndGet();
+            return super.checkRetry(err);
+          }
+        };
+      } else {
+        retryNode = new StdNode(new ZkCoreNodeProps(nodeProps), "collection1", "shard1", 6) {
+          @Override
+          public boolean checkRetry(Error err) {
+            retries.incrementAndGet();
+            return super.checkRetry(err);
+          }
+        };
+      }
+      
+
       nodes.add(retryNode);
 
       AddUpdateCommand cmd = new AddUpdateCommand(null);
@@ -359,18 +511,61 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
       cmdDistrib.distribAdd(cmd, nodes, params);
       cmdDistrib.finish();
 
-      assertEquals(6, retries.get());
+      assertEquals(7, retries.get());
 
       assertEquals(1, cmdDistrib.getErrors().size());
     }
   }
   
-  private void testOneRetry() throws Exception {
+  private void testReqShouldRetryNoRetries() {
+    Error err = getError(new SocketException()); 
+    SolrCmdDistributor.Req req = new SolrCmdDistributor.Req(null, new StdNode(null, "collection1", "shard1", 0), new UpdateRequest(), true);
+    assertFalse(req.shouldRetry(err));
+  }
+  
+  private void testReqShouldRetryDBQ() {
+    Error err = getError(new SocketException()); 
+    UpdateRequest dbqReq = new UpdateRequest();
+    dbqReq.deleteByQuery("*:*");
+    SolrCmdDistributor.Req req = new SolrCmdDistributor.Req(null, new StdNode(null, "collection1", "shard1", 1), dbqReq, true);
+    assertFalse(req.shouldRetry(err));
+  }
+  
+  private void testReqShouldRetryMaxRetries() {
+    Error err = getError(new SocketException()); 
+    SolrCmdDistributor.Req req = new SolrCmdDistributor.Req(null, new StdNode(null, "collection1", "shard1", 1), new UpdateRequest(), true);
+    assertTrue(req.shouldRetry(err));
+    req.retries++;
+    assertFalse(req.shouldRetry(err));
+  }
+  
+  private void testReqShouldRetryBadRequest() {
+    Error err = getError(new SolrException(SolrException.ErrorCode.BAD_REQUEST, "bad request")); 
+    SolrCmdDistributor.Req req = new SolrCmdDistributor.Req(null, new StdNode(null, "collection1", "shard1", 1), new UpdateRequest(), true);
+    assertFalse(req.shouldRetry(err));
+  }
+  
+  private void testReqShouldRetryNotFound() {
+    Error err = getError(new SolrException(SolrException.ErrorCode.NOT_FOUND, "not found"));
+    SolrCmdDistributor.Req req = new SolrCmdDistributor.Req(null, new StdNode(null, "collection1", "shard1", 1), new UpdateRequest(), true);
+    assertTrue(req.shouldRetry(err));
+  }
+  
+  private Error getError(Exception e) {
+    Error err = new Error();
+    err.e = e;
+    if (e instanceof SolrException) {
+      err.statusCode = ((SolrException)e).code();
+    }
+    return err;
+  }
+  
+  private void testOneRetry(NodeType nodeType) throws Exception {
     final HttpSolrClient solrclient = (HttpSolrClient) clients.get(0);
     long numFoundBefore = solrclient.query(new SolrQuery("*:*")).getResults()
         .getNumFound();
     final MockStreamingSolrClients streamingClients = new MockStreamingSolrClients(updateShardHandler);
-    try (SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(streamingClients, 5, 0)) {
+    try (SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(streamingClients, 0)) {
       streamingClients.setExp(Exp.CONNECT_EXCEPTION);
       ArrayList<Node> nodes = new ArrayList<>();
 
@@ -379,14 +574,26 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
 
       final AtomicInteger retries = new AtomicInteger();
       nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, solrclient.getBaseURL(), ZkStateReader.CORE_NAME_PROP, "");
-      RetryNode retryNode = new RetryNode(new ZkCoreNodeProps(nodeProps), null, "collection1", "shard1") {
-        @Override
-        public boolean checkRetry() {
-          streamingClients.setExp(null);
-          retries.incrementAndGet();
-          return true;
-        }
-      };
+      Node retryNode;
+      if (nodeType == NodeType.FORWARD) {
+        retryNode = new ForwardNode(new ZkCoreNodeProps(nodeProps), null, "collection1", "shard1", 5) {
+          @Override
+          public boolean checkRetry(Error err) {
+            streamingClients.setExp(null);
+            retries.incrementAndGet();
+            return super.checkRetry(err);
+          }
+        };
+      } else {
+        retryNode = new StdNode(new ZkCoreNodeProps(nodeProps), "collection1", "shard1", 5) {
+          @Override
+          public boolean checkRetry(Error err) {
+            streamingClients.setExp(null);
+            retries.incrementAndGet();
+            return super.checkRetry(err);
+          }
+        };
+      }
 
 
       nodes.add(retryNode);
@@ -412,12 +619,70 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
     }
   }
 
-  private void testRetryNodeWontRetrySocketError() throws Exception {
+  private void testNodeWontRetryBadRequest(NodeType nodeType) throws Exception {
+    ignoreException("Bad Request");
+    final HttpSolrClient solrclient = (HttpSolrClient) clients.get(0);
+    long numFoundBefore = solrclient.query(new SolrQuery("*:*")).getResults()
+        .getNumFound();
+    final MockStreamingSolrClients streamingClients = new MockStreamingSolrClients(updateShardHandler);
+    try (SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(streamingClients, 0)) {
+      streamingClients.setExp(Exp.BAD_REQUEST);
+      ArrayList<Node> nodes = new ArrayList<>();
+      ZkNodeProps nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, solrclient.getBaseURL(),
+          ZkStateReader.CORE_NAME_PROP, "");
+
+      final AtomicInteger retries = new AtomicInteger();
+      Node retryNode;
+      if (nodeType == NodeType.FORWARD) {
+        retryNode = new ForwardNode(new ZkCoreNodeProps(nodeProps), null, "collection1", "shard1", 5) {
+          @Override
+          public boolean checkRetry(Error err) {
+            retries.incrementAndGet();
+            return super.checkRetry(err);
+          }
+        };
+      } else {
+        retryNode = new StdNode(new ZkCoreNodeProps(nodeProps), "collection1", "shard1", 5) {
+          @Override
+          public boolean checkRetry(Error err) {
+            retries.incrementAndGet();
+            return super.checkRetry(err);
+          }
+        };
+      }
+      nodes.add(retryNode);
+
+      AddUpdateCommand cmd = new AddUpdateCommand(null);
+      cmd.solrDoc = sdoc("id", id.incrementAndGet());
+      ModifiableSolrParams params = new ModifiableSolrParams();
+
+      CommitUpdateCommand ccmd = new CommitUpdateCommand(null, false);
+      cmdDistrib.distribAdd(cmd, nodes, params);
+
+      streamingClients.setExp(null);
+      cmdDistrib.distribCommit(ccmd, nodes, params);
+      cmdDistrib.finish();
+
+      // it will checkRetry, but not actually do it...
+      assertEquals(1, retries.get());
+
+
+      long numFoundAfter = solrclient.query(new SolrQuery("*:*")).getResults()
+          .getNumFound();
+
+      // we will get java.net.SocketException: Network is unreachable, which we don't retry on
+      assertEquals(numFoundBefore, numFoundAfter);
+      assertEquals(1, cmdDistrib.getErrors().size());
+      unIgnoreException("Bad Request");
+    }
+  }
+  
+  private void testForwardNodeWontRetrySocketError() throws Exception {
     final HttpSolrClient solrclient = (HttpSolrClient) clients.get(0);
     long numFoundBefore = solrclient.query(new SolrQuery("*:*")).getResults()
         .getNumFound();
     final MockStreamingSolrClients streamingClients = new MockStreamingSolrClients(updateShardHandler);
-    try (SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(streamingClients, 5, 0)) {
+    try (SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(streamingClients, 0)) {
       streamingClients.setExp(Exp.SOCKET_EXCEPTION);
       ArrayList<Node> nodes = new ArrayList<>();
 
@@ -426,11 +691,11 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
 
       final AtomicInteger retries = new AtomicInteger();
       nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, solrclient.getBaseURL(), ZkStateReader.CORE_NAME_PROP, "");
-      RetryNode retryNode = new RetryNode(new ZkCoreNodeProps(nodeProps), null, "collection1", "shard1") {
+      ForwardNode retryNode = new ForwardNode(new ZkCoreNodeProps(nodeProps), null, "collection1", "shard1", 5) {
         @Override
-        public boolean checkRetry() {
+        public boolean checkRetry(Error err) {
           retries.incrementAndGet();
-          return true;
+          return super.checkRetry(err);
         }
       };
 
@@ -460,6 +725,41 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
       assertEquals(1, cmdDistrib.getErrors().size());
     }
   }
+  
+  private void testStdNodeRetriesSocketError() throws Exception {
+    final HttpSolrClient solrclient = (HttpSolrClient) clients.get(0);
+    final MockStreamingSolrClients streamingClients = new MockStreamingSolrClients(updateShardHandler);
+    try (SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(streamingClients, 0)) {
+      streamingClients.setExp(Exp.SOCKET_EXCEPTION);
+      ArrayList<Node> nodes = new ArrayList<>();
+
+      ZkNodeProps nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, solrclient.getBaseURL(),
+          ZkStateReader.CORE_NAME_PROP, "");
+
+      final AtomicInteger retries = new AtomicInteger();
+      nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, solrclient.getBaseURL(), ZkStateReader.CORE_NAME_PROP, "");
+      Node retryNode = new StdNode(new ZkCoreNodeProps(nodeProps), "collection1", "shard1", 5) {
+        @Override
+        public boolean checkRetry(Error err) {
+          retries.incrementAndGet();
+          return super.checkRetry(err);
+        }
+      };
+
+
+      nodes.add(retryNode);
+
+      AddUpdateCommand cmd = new AddUpdateCommand(null);
+      cmd.solrDoc = sdoc("id", id.incrementAndGet());
+      ModifiableSolrParams params = new ModifiableSolrParams();
+
+      cmdDistrib.distribAdd(cmd, nodes, params);
+      cmdDistrib.finish();
+
+      // it will checkRetry, but not actually do it...
+      assertEquals(6, retries.get());
+    }
+  }
 
   private void testRetryNodeAgainstBadAddress() throws SolrServerException, IOException {
     // Test RetryNode
@@ -471,14 +771,14 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
       ArrayList<Node> nodes = new ArrayList<>();
 
       ZkNodeProps nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, "[ff01::114]:33332" + context, ZkStateReader.CORE_NAME_PROP, "");
-      RetryNode retryNode = new RetryNode(new ZkCoreNodeProps(nodeProps), null, "collection1", "shard1") {
+      ForwardNode retryNode = new ForwardNode(new ZkCoreNodeProps(nodeProps), null, "collection1", "shard1", 5) {
         @Override
-        public boolean checkRetry() {
+        public boolean checkRetry(Error err) {
           ZkNodeProps leaderProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, solrclient.getBaseURL(),
               ZkStateReader.CORE_NAME_PROP, "");
           this.nodeProps = new ZkCoreNodeProps(leaderProps);
 
-          return true;
+          return super.checkRetry(err);
         }
       };