You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by tf...@apache.org on 2018/08/06 22:57:22 UTC
lucene-solr:master: SOLR-11881: Retry update requests from leaders to
followers
Repository: lucene-solr
Updated Branches:
refs/heads/master ea2e564da -> c338cf61e
SOLR-11881: Retry update requests from leaders to followers
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/c338cf61
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/c338cf61
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/c338cf61
Branch: refs/heads/master
Commit: c338cf61e7baba4908c31e02beda47ae3e201752
Parents: ea2e564
Author: Tomas Fernandez Lobbe <tf...@apache.org>
Authored: Mon Aug 6 15:56:49 2018 -0700
Committer: Tomas Fernandez Lobbe <tf...@apache.org>
Committed: Mon Aug 6 15:56:49 2018 -0700
----------------------------------------------------------------------
solr/CHANGES.txt | 2 +
.../apache/solr/update/SolrCmdDistributor.java | 230 ++++++++----
.../solr/update/StreamingSolrClients.java | 5 +-
.../processor/DistributedUpdateProcessor.java | 36 +-
.../TimeRoutedAliasUpdateProcessor.java | 4 +-
.../solr/update/MockStreamingSolrClients.java | 27 +-
.../solr/update/SolrCmdDistributorTest.java | 370 +++++++++++++++++--
7 files changed, 540 insertions(+), 134 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c338cf61/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 605e837..5fb1f88 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -233,6 +233,8 @@ Optimizations
introduced (splitMethod=link) which uses hard-linking of index files when possible, resulting in
significant speedups and reduced CPU / IO load on shard leader. (ab)
+* SOLR-11881: Retry update requests sent by leaders to it's followers (Varun Thacker, Mark Miller, Tomás Fernández Löbbe)
+
Other Changes
----------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c338cf61/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
index 80e2253..d5aafec 100644
--- a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
+++ b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
@@ -17,12 +17,28 @@
package org.apache.solr.update;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.invoke.MethodHandles;
+import java.net.ConnectException;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Future;
import org.apache.http.HttpResponse;
+import org.apache.http.NoHttpResponseException;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.BinaryResponseParser;
-import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.impl.ConcurrentUpdateSolrClient; // jdoc
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.common.SolrException;
@@ -32,37 +48,21 @@ import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.Diagnostics;
import org.apache.solr.update.processor.DistributedUpdateProcessor;
-import org.apache.solr.update.processor.DistributedUpdateProcessor.RollupRequestReplicationTracker;
import org.apache.solr.update.processor.DistributedUpdateProcessor.LeaderRequestReplicationTracker;
+import org.apache.solr.update.processor.DistributedUpdateProcessor.RollupRequestReplicationTracker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.InputStream;
-import java.lang.invoke.MethodHandles;
-import java.net.ConnectException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CompletionService;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.Future;
-
/**
* Used for distributing commands from a shard leader to its replicas.
*/
public class SolrCmdDistributor implements Closeable {
- private static final int MAX_RETRIES_ON_FORWARD = 25;
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private StreamingSolrClients clients;
private boolean finished = false; // see finish()
private int retryPause = 500;
- private int maxRetriesOnForward = MAX_RETRIES_ON_FORWARD;
private final List<Error> allErrors = new ArrayList<>();
private final List<Error> errors = Collections.synchronizedList(new ArrayList<Error>());
@@ -79,9 +79,8 @@ public class SolrCmdDistributor implements Closeable {
this.completionService = new ExecutorCompletionService<>(updateShardHandler.getUpdateExecutor());
}
- public SolrCmdDistributor(StreamingSolrClients clients, int maxRetriesOnForward, int retryPause) {
+ public SolrCmdDistributor(StreamingSolrClients clients, int retryPause) {
this.clients = clients;
- this.maxRetriesOnForward = maxRetriesOnForward;
this.retryPause = retryPause;
completionService = new ExecutorCompletionService<>(clients.getUpdateExecutor());
}
@@ -107,55 +106,63 @@ public class SolrCmdDistributor implements Closeable {
List<Error> errors = new ArrayList<>(this.errors);
errors.addAll(clients.getErrors());
List<Error> resubmitList = new ArrayList<>();
+
+ if (log.isInfoEnabled() && errors.size() > 0) {
+ log.info("SolrCmdDistributor found {} errors", errors.size());
+ }
+
+ if (log.isDebugEnabled() && errors.size() > 0) {
+ StringBuilder builder = new StringBuilder("SolrCmdDistributor found:");
+ int maxErrorsToShow = 10;
+ for (Error e:errors) {
+ if (maxErrorsToShow-- <= 0) break;
+ builder.append("\n" + e);
+ }
+ if (errors.size() > 10) {
+ builder.append("\n... and ");
+ builder.append(errors.size() - 10);
+ builder.append(" more");
+ }
+ log.debug(builder.toString());
+ }
for (Error err : errors) {
try {
String oldNodeUrl = err.req.node.getUrl();
- // if there is a retry url, we want to retry...
- boolean isRetry = err.req.node.checkRetry();
-
- boolean doRetry = false;
- int rspCode = err.statusCode;
+ /*
+ * if this is a retryable request we may want to retry, depending on the error we received and
+ * the number of times we have already retried
+ */
+ boolean isRetry = err.req.shouldRetry(err);
if (testing_errorHook != null) Diagnostics.call(testing_errorHook,
err.e);
// this can happen in certain situations such as close
if (isRetry) {
- if (rspCode == 404 || rspCode == 403 || rspCode == 503) {
- doRetry = true;
- }
-
- // if it's a connect exception, lets try again
- if (err.e instanceof SolrServerException) {
- if (((SolrServerException) err.e).getRootCause() instanceof ConnectException) {
- doRetry = true;
- }
- }
-
- if (err.e instanceof ConnectException) {
- doRetry = true;
- }
-
- if (err.req.retries < maxRetriesOnForward && doRetry) {
- err.req.retries++;
-
+ err.req.retries++;
+
+ if (err.req.node instanceof ForwardNode) {
SolrException.log(SolrCmdDistributor.log, "forwarding update to "
+ oldNodeUrl + " failed - retrying ... retries: "
- + err.req.retries + " " + err.req.cmd.toString() + " params:"
- + err.req.uReq.getParams() + " rsp:" + rspCode, err.e);
- try {
- Thread.sleep(retryPause);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- log.warn(null, e);
- }
-
- resubmitList.add(err);
+ + err.req.retries + "/" + err.req.node.getMaxRetries() + ". "
+ + err.req.cmd.toString() + " params:"
+ + err.req.uReq.getParams() + " rsp:" + err.statusCode, err.e);
} else {
- allErrors.add(err);
+ SolrException.log(SolrCmdDistributor.log, "FROMLEADER request to "
+ + oldNodeUrl + " failed - retrying ... retries: "
+ + err.req.retries + "/" + err.req.node.getMaxRetries() + ". "
+ + err.req.cmd.toString() + " params:"
+ + err.req.uReq.getParams() + " rsp:" + err.statusCode, err.e);
+ }
+ try {
+ Thread.sleep(retryPause); //TODO: Do we want this wait for every error?
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ log.warn(null, e);
}
+ resubmitList.add(err);
} else {
allErrors.add(err);
}
@@ -184,6 +191,10 @@ public class SolrCmdDistributor implements Closeable {
RollupRequestReplicationTracker rollupTracker,
LeaderRequestReplicationTracker leaderTracker) throws IOException {
+ if (!cmd.isDeleteById()) {
+ blockAndDoRetries(); // For DBQ, flush all writes before submitting
+ }
+
for (Node node : nodes) {
UpdateRequest uReq = new UpdateRequest();
uReq.setParams(params);
@@ -193,7 +204,6 @@ public class SolrCmdDistributor implements Closeable {
} else {
uReq.deleteByQuery(cmd.query);
}
-
submit(new Req(cmd, node, uReq, sync, rollupTracker, leaderTracker), false);
}
}
@@ -346,6 +356,16 @@ public class SolrCmdDistributor implements Closeable {
this.leaderTracker = leaderTracker;
}
+ /**
+ * @return true if this request should be retried after receiving a particular error
+ * false otherwise
+ */
+ public boolean shouldRetry(Error err) {
+ boolean isRetry = node.checkRetry(err);
+ isRetry &= uReq.getDeleteQuery() == null || uReq.getDeleteQuery().isEmpty(); //Don't retry DBQs
+ return isRetry && retries < node.getMaxRetries();
+ }
+
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("SolrCmdDistributor$Req: cmd=").append(cmd.toString());
@@ -440,27 +460,36 @@ public class SolrCmdDistributor implements Closeable {
public static abstract class Node {
public abstract String getUrl();
- public abstract boolean checkRetry();
+ public abstract boolean checkRetry(Error e);
public abstract String getCoreName();
public abstract String getBaseUrl();
public abstract ZkCoreNodeProps getNodeProps();
public abstract String getCollection();
public abstract String getShardId();
+ public abstract int getMaxRetries();
}
public static class StdNode extends Node {
protected ZkCoreNodeProps nodeProps;
protected String collection;
protected String shardId;
+ private final boolean retry;
+ private final int maxRetries;
public StdNode(ZkCoreNodeProps nodeProps) {
- this(nodeProps, null, null);
+ this(nodeProps, null, null, 0);
}
- public StdNode(ZkCoreNodeProps nodeProps, String collection, String shardId) {
+ public StdNode(ZkCoreNodeProps nodeProps, String collection, String shardId) {
+ this(nodeProps, collection, shardId, 0);
+ }
+
+ public StdNode(ZkCoreNodeProps nodeProps, String collection, String shardId, int maxRetries) {
this.nodeProps = nodeProps;
this.collection = collection;
this.shardId = shardId;
+ this.retry = maxRetries > 0;
+ this.maxRetries = maxRetries;
}
public String getCollection() {
@@ -482,9 +511,33 @@ public class SolrCmdDistributor implements Closeable {
}
@Override
- public boolean checkRetry() {
+ public boolean checkRetry(Error err) {
+ if (!retry) return false;
+
+ if (err.statusCode == 404 || err.statusCode == 403 || err.statusCode == 503) {
+ return true;
+ }
+
+ // if it's a connect exception, lets try again
+ if (err.e instanceof SolrServerException) {
+ if (isRetriableException(((SolrServerException) err.e).getRootCause())) {
+ return true;
+ }
+ } else {
+ if (isRetriableException(err.e)) {
+ return true;
+ }
+ }
return false;
}
+
+ /**
+ * @return true if Solr should retry in case of hitting this exception
+ * false otherwise
+ */
+ private boolean isRetriableException(Throwable t) {
+ return t instanceof SocketException || t instanceof NoHttpResponseException || t instanceof SocketTimeoutException;
+ }
@Override
public String getBaseUrl() {
@@ -506,6 +559,8 @@ public class SolrCmdDistributor implements Closeable {
result = prime * result + ((baseUrl == null) ? 0 : baseUrl.hashCode());
result = prime * result + ((coreName == null) ? 0 : coreName.hashCode());
result = prime * result + ((url == null) ? 0 : url.hashCode());
+ result = prime * result + Boolean.hashCode(retry);
+ result = prime * result + Integer.hashCode(maxRetries);
return result;
}
@@ -515,6 +570,8 @@ public class SolrCmdDistributor implements Closeable {
if (obj == null) return false;
if (getClass() != obj.getClass()) return false;
StdNode other = (StdNode) obj;
+ if (this.retry != other.retry) return false;
+ if (this.maxRetries != other.maxRetries) return false;
String baseUrl = nodeProps.getBaseUrl();
String coreName = nodeProps.getCoreName();
String url = nodeProps.getCoreUrl();
@@ -534,39 +591,56 @@ public class SolrCmdDistributor implements Closeable {
public ZkCoreNodeProps getNodeProps() {
return nodeProps;
}
+
+ @Override
+ public int getMaxRetries() {
+ return this.maxRetries;
+ }
}
// RetryNodes are used in the case of 'forward to leader' where we want
// to try the latest leader on a fail in the case the leader just went down.
- public static class RetryNode extends StdNode {
+ public static class ForwardNode extends StdNode {
private ZkStateReader zkStateReader;
- public RetryNode(ZkCoreNodeProps nodeProps, ZkStateReader zkStateReader, String collection, String shardId) {
- super(nodeProps, collection, shardId);
+ public ForwardNode(ZkCoreNodeProps nodeProps, ZkStateReader zkStateReader, String collection, String shardId, int maxRetries) {
+ super(nodeProps, collection, shardId, maxRetries);
this.zkStateReader = zkStateReader;
this.collection = collection;
this.shardId = shardId;
}
@Override
- public boolean checkRetry() {
- ZkCoreNodeProps leaderProps;
- try {
- leaderProps = new ZkCoreNodeProps(zkStateReader.getLeaderRetry(
- collection, shardId));
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- return false;
- } catch (Exception e) {
- // we retry with same info
- log.warn(null, e);
- return true;
+ public boolean checkRetry(Error err) {
+ boolean doRetry = false;
+ if (err.statusCode == 404 || err.statusCode == 403 || err.statusCode == 503) {
+ doRetry = true;
}
-
- this.nodeProps = leaderProps;
- return true;
+ // if it's a connect exception, lets try again
+ if (err.e instanceof SolrServerException && ((SolrServerException) err.e).getRootCause() instanceof ConnectException) {
+ doRetry = true;
+ } else if (err.e instanceof ConnectException) {
+ doRetry = true;
+ }
+ if (doRetry) {
+ ZkCoreNodeProps leaderProps;
+ try {
+ leaderProps = new ZkCoreNodeProps(zkStateReader.getLeaderRetry(
+ collection, shardId));
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return false;
+ } catch (Exception e) {
+ // we retry with same info
+ log.warn(null, e);
+ return true;
+ }
+
+ this.nodeProps = leaderProps;
+ }
+ return doRetry;
}
@Override
@@ -584,7 +658,7 @@ public class SolrCmdDistributor implements Closeable {
if (this == obj) return true;
if (!super.equals(obj)) return false;
if (getClass() != obj.getClass()) return false;
- RetryNode other = (RetryNode) obj;
+ ForwardNode other = (ForwardNode) obj;
if (nodeProps.getCoreUrl() == null) {
if (other.nodeProps.getCoreUrl() != null) return false;
} else if (!nodeProps.getCoreUrl().equals(other.nodeProps.getCoreUrl())) return false;
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c338cf61/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java b/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java
index eb4caec..eb92540 100644
--- a/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java
+++ b/solr/core/src/java/org/apache/solr/update/StreamingSolrClients.java
@@ -143,7 +143,6 @@ class ErrorReportingConcurrentUpdateSolrClient extends ConcurrentUpdateSolrClien
@Override
public void handleError(Throwable ex) {
- req.trackRequestResult(null, false);
log.error("error", ex);
Error error = new Error();
error.e = (Exception) ex;
@@ -152,6 +151,10 @@ class ErrorReportingConcurrentUpdateSolrClient extends ConcurrentUpdateSolrClien
}
error.req = req;
errors.add(error);
+ if (!req.shouldRetry(error)) {
+ // only track the error if we are not retrying the request
+ req.trackRequestResult(null, false);
+ }
}
@Override
public void onSuccess(HttpResponse resp) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c338cf61/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
index a21d906..e1e7968 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
@@ -83,7 +83,7 @@ import org.apache.solr.update.DeleteUpdateCommand;
import org.apache.solr.update.SolrCmdDistributor;
import org.apache.solr.update.SolrCmdDistributor.Error;
import org.apache.solr.update.SolrCmdDistributor.Node;
-import org.apache.solr.update.SolrCmdDistributor.RetryNode;
+import org.apache.solr.update.SolrCmdDistributor.ForwardNode;
import org.apache.solr.update.SolrCmdDistributor.StdNode;
import org.apache.solr.update.SolrIndexSplitter;
import org.apache.solr.update.UpdateCommand;
@@ -114,6 +114,16 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
/**
+ * Request forwarded to a leader of a different shard will be retried up to this amount of times by default
+ */
+ static final int MAX_RETRIES_ON_FORWARD_DEAULT = 25;
+
+ /**
+ * Requests from leader to it's followers will be retried this amount of times by default
+ */
+ static final int MAX_RETRIES_TO_FOLLOWERS_DEFAULT = 3;
+
+ /**
* Values this processor supports for the <code>DISTRIB_UPDATE_PARAM</code>.
* This is an implementation detail exposed solely for tests.
*
@@ -175,6 +185,15 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
private List<Node> nodes;
private Set<String> skippedCoreNodeNames;
private boolean isIndexChanged = false;
+
+ /**
+ * Number of times requests forwarded to some other shard's leader can be retried
+ */
+ private final int maxRetriesOnForward = MAX_RETRIES_ON_FORWARD_DEAULT;
+ /**
+ * Number of times requests from leaders to followers can be retried
+ */
+ private final int maxRetriesToFollowers = MAX_RETRIES_TO_FOLLOWERS_DEFAULT;
private UpdateCommand updateCommand; // the current command this processor is working on.
@@ -368,7 +387,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
} else if (!clusterState.getLiveNodes().contains(replica.getNodeName()) || replica.getState() == Replica.State.DOWN) {
skippedCoreNodeNames.add(replica.getName());
} else {
- nodes.add(new StdNode(new ZkCoreNodeProps(replica), collection, shardId));
+ nodes.add(new StdNode(new ZkCoreNodeProps(replica), collection, shardId, maxRetriesToFollowers));
}
}
return nodes;
@@ -377,7 +396,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
// I need to forward on to the leader...
forwardToLeader = true;
return Collections.singletonList(
- new RetryNode(new ZkCoreNodeProps(leaderReplica), zkController.getZkStateReader(), collection, shardId));
+ new ForwardNode(new ZkCoreNodeProps(leaderReplica), zkController.getZkStateReader(), collection, shardId, maxRetriesOnForward));
}
} catch (InterruptedException e) {
@@ -779,7 +798,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
Set<String> replicasShouldBeInLowerTerms = new HashSet<>();
for (final SolrCmdDistributor.Error error : errors) {
- if (error.req.node instanceof RetryNode) {
+ if (error.req.node instanceof ForwardNode) {
// if it's a forward, any fail is a problem -
// otherwise we assume things are fine if we got it locally
// until we start allowing min replication param
@@ -1538,7 +1557,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
// don't forward to ourself
leaderForAnyShard = true;
} else {
- leaders.add(new RetryNode(coreLeaderProps, zkController.getZkStateReader(), collection, sliceName));
+ leaders.add(new ForwardNode(coreLeaderProps, zkController.getZkStateReader(), collection, sliceName, maxRetriesOnForward));
}
}
@@ -2100,7 +2119,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
private int achievedRf = Integer.MAX_VALUE;
private final int requestedRf;
- RollupRequestReplicationTracker(String minRepFact) {
+ public RollupRequestReplicationTracker(String minRepFact) {
try {
this.requestedRf = Integer.parseInt(minRepFact);
} catch (NumberFormatException nfe) {
@@ -2153,7 +2172,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
return requestedRf;
}
- LeaderRequestReplicationTracker(String shardId, int requestedRf) {
+ public LeaderRequestReplicationTracker(String shardId, int requestedRf) {
this.requestedRf = requestedRf;
this.myShardId = shardId;
}
@@ -2165,8 +2184,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
public void trackRequestResult(Node node, boolean success) {
if (log.isDebugEnabled()) {
- log.debug("trackRequestResult(" + node + "): success? " + success +
- ", shardId=" + myShardId);
+ log.debug("trackRequestResult({}): success? {}, shardId={}", node, success, myShardId);
}
if (success) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c338cf61/solr/core/src/java/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessor.java
index 1d2d730..cd4ed00 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/TimeRoutedAliasUpdateProcessor.java
@@ -401,8 +401,8 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor {
throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE,
"No 'leader' replica available for shard " + slice.getName() + " of collection " + collection);
}
- return new SolrCmdDistributor.RetryNode(new ZkCoreNodeProps(leader), zkController.getZkStateReader(),
- collection, slice.getName());
+ return new SolrCmdDistributor.ForwardNode(new ZkCoreNodeProps(leader), zkController.getZkStateReader(),
+ collection, slice.getName(), DistributedUpdateProcessor.MAX_RETRIES_ON_FORWARD_DEAULT);
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c338cf61/solr/core/src/test/org/apache/solr/update/MockStreamingSolrClients.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/update/MockStreamingSolrClients.java b/solr/core/src/test/org/apache/solr/update/MockStreamingSolrClients.java
index 72d39ff..c269c9e 100644
--- a/solr/core/src/test/org/apache/solr/update/MockStreamingSolrClients.java
+++ b/solr/core/src/test/org/apache/solr/update/MockStreamingSolrClients.java
@@ -16,19 +16,19 @@
*/
package org.apache.solr.update;
+import java.io.IOException;
+import java.net.ConnectException;
+import java.net.SocketException;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.common.SolrException;
import org.apache.solr.common.util.NamedList;
-import java.io.IOException;
-import java.net.ConnectException;
-import java.net.SocketException;
-
public class MockStreamingSolrClients extends StreamingSolrClients {
- public enum Exp {CONNECT_EXCEPTION, SOCKET_EXCEPTION};
+ public enum Exp {CONNECT_EXCEPTION, SOCKET_EXCEPTION, BAD_REQUEST};
private volatile Exp exp = null;
@@ -46,12 +46,14 @@ public class MockStreamingSolrClients extends StreamingSolrClients {
this.exp = exp;
}
- private IOException exception() {
+ private Exception exception() {
switch (exp) {
case CONNECT_EXCEPTION:
return new ConnectException();
case SOCKET_EXCEPTION:
return new SocketException();
+ case BAD_REQUEST:
+ return new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Bad Request");
default:
break;
}
@@ -70,10 +72,17 @@ public class MockStreamingSolrClients extends StreamingSolrClients {
public NamedList<Object> request(SolrRequest request, String collection)
throws SolrServerException, IOException {
if (exp != null) {
- if (LuceneTestCase.random().nextBoolean()) {
- throw exception();
+ Exception e = exception();
+ if (e instanceof IOException) {
+ if (LuceneTestCase.random().nextBoolean()) {
+ throw (IOException)e;
+ } else {
+ throw new SolrServerException(e);
+ }
+ } else if (e instanceof SolrServerException) {
+ throw (SolrServerException)e;
} else {
- throw new SolrServerException(exception());
+ throw new SolrServerException(e);
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/c338cf61/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java b/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java
index 1699b0d..24cf717 100644
--- a/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java
+++ b/solr/core/src/test/org/apache/solr/update/SolrCmdDistributorTest.java
@@ -16,15 +16,15 @@
*/
package org.apache.solr.update;
-import javax.xml.parsers.ParserConfigurationException;
import java.io.File;
import java.io.IOException;
+import java.net.SocketException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
-
+import javax.xml.parsers.ParserConfigurationException;
import org.apache.solr.BaseDistributedSearchTestCase;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrQuery;
@@ -34,6 +34,7 @@ import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.LukeRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.common.SolrDocumentList;
+import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
@@ -47,10 +48,12 @@ import org.apache.solr.index.LogDocMergePolicyFactory;
import org.apache.solr.search.SolrIndexSearcher;
import org.apache.solr.update.MockStreamingSolrClients.Exp;
import org.apache.solr.update.SolrCmdDistributor.Error;
+import org.apache.solr.update.SolrCmdDistributor.ForwardNode;
import org.apache.solr.update.SolrCmdDistributor.Node;
-import org.apache.solr.update.SolrCmdDistributor.RetryNode;
import org.apache.solr.update.SolrCmdDistributor.StdNode;
import org.apache.solr.update.processor.DistributedUpdateProcessor;
+import org.apache.solr.update.processor.DistributedUpdateProcessor.LeaderRequestReplicationTracker;
+import org.apache.solr.update.processor.DistributedUpdateProcessor.RollupRequestReplicationTracker;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -59,6 +62,8 @@ import org.xml.sax.SAXException;
// See: https://issues.apache.org/jira/browse/SOLR-12028 Tests cannot remove files on Windows machines occasionally
public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
+ private static enum NodeType {FORWARD, STANDARD};
+
private AtomicInteger id = new AtomicInteger();
@BeforeClass
@@ -127,6 +132,7 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
shards = sb.toString();
}
+ @SuppressWarnings("unchecked")
@Test
@ShardsFixed(num = 4)
public void test() throws Exception {
@@ -325,31 +331,177 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
}
}
- testMaxRetries();
- testOneRetry();
+ testMaxRetries(NodeType.FORWARD);
+ testMaxRetries(NodeType.STANDARD);
+ testOneRetry(NodeType.FORWARD);
+ testOneRetry(NodeType.STANDARD);
testRetryNodeAgainstBadAddress();
- testRetryNodeWontRetrySocketError();
-
+ testStdNodeRetriesSocketError();
+ testForwardNodeWontRetrySocketError();
+ testNodeWontRetryBadRequest(NodeType.FORWARD);
+ testNodeWontRetryBadRequest(NodeType.STANDARD);
+ testMinRfOnRetries(NodeType.FORWARD);
+ testMinRfOnRetries(NodeType.STANDARD);
testDistribOpenSearcher();
+ testReqShouldRetryNoRetries();
+ testReqShouldRetryMaxRetries();
+ testReqShouldRetryBadRequest();
+ testReqShouldRetryNotFound();
+ testReqShouldRetryDBQ();
+ testDeletes(false, true);
+ testDeletes(false, false);
+ testDeletes(true, true);
+ testDeletes(true, false);
}
-
- private void testMaxRetries() throws IOException {
+
+ private void testDeletes(boolean dbq, boolean withFailures) throws Exception {
+ final HttpSolrClient solrclient = (HttpSolrClient) clients.get(0);
+ solrclient.commit(true, true);
+ long numFoundBefore = solrclient.query(new SolrQuery("*:*")).getResults()
+ .getNumFound();
final MockStreamingSolrClients streamingClients = new MockStreamingSolrClients(updateShardHandler);
- try (SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(streamingClients, 5, 0)) {
- streamingClients.setExp(Exp.CONNECT_EXCEPTION);
+ try (SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(streamingClients, 0)) {
+ if (withFailures) {
+ streamingClients.setExp(Exp.CONNECT_EXCEPTION);
+ }
ArrayList<Node> nodes = new ArrayList<>();
- final HttpSolrClient solrclient1 = (HttpSolrClient) clients.get(0);
+
+ ZkNodeProps nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, solrclient.getBaseURL(),
+ ZkStateReader.CORE_NAME_PROP, "");
final AtomicInteger retries = new AtomicInteger();
- ZkNodeProps nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, solrclient1.getBaseURL(), ZkStateReader.CORE_NAME_PROP, "");
- RetryNode retryNode = new RetryNode(new ZkCoreNodeProps(nodeProps), null, "collection1", "shard1") {
+ nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, solrclient.getBaseURL(), ZkStateReader.CORE_NAME_PROP, "");
+ Node retryNode = new StdNode(new ZkCoreNodeProps(nodeProps), "collection1", "shard1", 5) {
@Override
- public boolean checkRetry() {
+ public boolean checkRetry(Error err) {
+ streamingClients.setExp(null);
retries.incrementAndGet();
- return true;
+ return super.checkRetry(err);
}
};
+
+ nodes.add(retryNode);
+
+ for (int i = 0 ; i < 5 ; i++) {
+ AddUpdateCommand cmd = new AddUpdateCommand(null);
+ int currentId = id.incrementAndGet();
+ cmd.solrDoc = sdoc("id", currentId);
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ cmdDistrib.distribAdd(cmd, nodes, params);
+ DeleteUpdateCommand dcmd = new DeleteUpdateCommand(null);
+ if (dbq) {
+ dcmd.setQuery("id:" + currentId);
+ } else {
+ dcmd.setId(String.valueOf(currentId));
+ }
+ cmdDistrib.distribDelete(dcmd, nodes, params, false, null, null);
+ }
+
+
+ CommitUpdateCommand ccmd = new CommitUpdateCommand(null, false);
+ cmdDistrib.distribCommit(ccmd, nodes, new ModifiableSolrParams());
+ cmdDistrib.finish();
+
+ int expectedRetryCount = 0;
+ if (withFailures) {
+ if (dbq) {
+ expectedRetryCount = 1; // just the first cmd would be retried
+ } else {
+ expectedRetryCount = 10;
+ }
+ }
+ assertEquals(expectedRetryCount, retries.get());
+
+
+ long numFoundAfter = solrclient.query(new SolrQuery("*:*")).getResults()
+ .getNumFound();
+
+ // we will get java.net.ConnectException which we retry on
+ assertEquals(numFoundBefore, numFoundAfter);
+ assertEquals(0, cmdDistrib.getErrors().size());
+ }
+ }
+
+ private void testMinRfOnRetries(NodeType nodeType) throws Exception {
+ final HttpSolrClient solrclient = (HttpSolrClient) clients.get(0);
+ final MockStreamingSolrClients streamingClients = new MockStreamingSolrClients(updateShardHandler);
+ try (SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(streamingClients, 0)) {
+ streamingClients.setExp(Exp.CONNECT_EXCEPTION);
+ ArrayList<Node> nodes = new ArrayList<>();
+
+ ZkNodeProps nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, solrclient.getBaseURL(),
+ ZkStateReader.CORE_NAME_PROP, "");
+
+ final AtomicInteger retries = new AtomicInteger();
+ nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, solrclient.getBaseURL(), ZkStateReader.CORE_NAME_PROP, "");
+ if (nodeType == NodeType.FORWARD) {
+ nodes.add(new ForwardNode(new ZkCoreNodeProps(nodeProps), null, "collection1", "shard1", 5) {
+ @Override
+ public boolean checkRetry(Error err) {
+ if (retries.incrementAndGet() >= 3) {
+ streamingClients.setExp(null);
+ }
+ return super.checkRetry(err);
+ }
+ });
+ } else {
+ nodes.add(new StdNode(new ZkCoreNodeProps(nodeProps), "collection1", "shard1", 5) {
+ @Override
+ public boolean checkRetry(Error err) {
+ if (retries.incrementAndGet() >= 3) {
+ streamingClients.setExp(null);
+ }
+ return super.checkRetry(err);
+ }
+ });
+ }
+
+
+ AddUpdateCommand cmd = new AddUpdateCommand(null);
+ cmd.solrDoc = sdoc("id", id.incrementAndGet());
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ RollupRequestReplicationTracker rollupReqTracker = new RollupRequestReplicationTracker("2");
+ LeaderRequestReplicationTracker leaderReqTracker = new LeaderRequestReplicationTracker("shard1", 2);
+
+ cmdDistrib.distribAdd(cmd, nodes, params, false, rollupReqTracker, leaderReqTracker);
+ cmdDistrib.finish();
+ assertEquals(3, retries.get());
+ assertEquals(2, leaderReqTracker.getAchievedRf());// "2" here is because one would be the leader, that creates the instance of LeaderRequestReplicationTracker, the second one is the node
+
+ assertEquals(0, cmdDistrib.getErrors().size());
+ }
+ }
+
+ private void testMaxRetries(NodeType nodeType) throws IOException {
+ final MockStreamingSolrClients streamingClients = new MockStreamingSolrClients(updateShardHandler);
+ try (SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(streamingClients, 0)) {
+ streamingClients.setExp(Exp.CONNECT_EXCEPTION);
+ ArrayList<Node> nodes = new ArrayList<>();
+ final HttpSolrClient solrclient1 = (HttpSolrClient) clients.get(0);
+
+ final AtomicInteger retries = new AtomicInteger();
+ ZkNodeProps nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, solrclient1.getBaseURL(), ZkStateReader.CORE_NAME_PROP, "");
+ Node retryNode;
+ if (nodeType == NodeType.FORWARD) {
+ retryNode = new ForwardNode(new ZkCoreNodeProps(nodeProps), null, "collection1", "shard1", 6) {
+ @Override
+ public boolean checkRetry(Error err) {
+ retries.incrementAndGet();
+ return super.checkRetry(err);
+ }
+ };
+ } else {
+ retryNode = new StdNode(new ZkCoreNodeProps(nodeProps), "collection1", "shard1", 6) {
+ @Override
+ public boolean checkRetry(Error err) {
+ retries.incrementAndGet();
+ return super.checkRetry(err);
+ }
+ };
+ }
+
+
nodes.add(retryNode);
AddUpdateCommand cmd = new AddUpdateCommand(null);
@@ -359,18 +511,61 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
cmdDistrib.distribAdd(cmd, nodes, params);
cmdDistrib.finish();
- assertEquals(6, retries.get());
+ assertEquals(7, retries.get());
assertEquals(1, cmdDistrib.getErrors().size());
}
}
- private void testOneRetry() throws Exception {
+ private void testReqShouldRetryNoRetries() {
+ Error err = getError(new SocketException());
+ SolrCmdDistributor.Req req = new SolrCmdDistributor.Req(null, new StdNode(null, "collection1", "shard1", 0), new UpdateRequest(), true);
+ assertFalse(req.shouldRetry(err));
+ }
+
+ private void testReqShouldRetryDBQ() {
+ Error err = getError(new SocketException());
+ UpdateRequest dbqReq = new UpdateRequest();
+ dbqReq.deleteByQuery("*:*");
+ SolrCmdDistributor.Req req = new SolrCmdDistributor.Req(null, new StdNode(null, "collection1", "shard1", 1), dbqReq, true);
+ assertFalse(req.shouldRetry(err));
+ }
+
+ private void testReqShouldRetryMaxRetries() {
+ Error err = getError(new SocketException());
+ SolrCmdDistributor.Req req = new SolrCmdDistributor.Req(null, new StdNode(null, "collection1", "shard1", 1), new UpdateRequest(), true);
+ assertTrue(req.shouldRetry(err));
+ req.retries++;
+ assertFalse(req.shouldRetry(err));
+ }
+
+ private void testReqShouldRetryBadRequest() {
+ Error err = getError(new SolrException(SolrException.ErrorCode.BAD_REQUEST, "bad request"));
+ SolrCmdDistributor.Req req = new SolrCmdDistributor.Req(null, new StdNode(null, "collection1", "shard1", 1), new UpdateRequest(), true);
+ assertFalse(req.shouldRetry(err));
+ }
+
+ private void testReqShouldRetryNotFound() {
+ Error err = getError(new SolrException(SolrException.ErrorCode.NOT_FOUND, "not found"));
+ SolrCmdDistributor.Req req = new SolrCmdDistributor.Req(null, new StdNode(null, "collection1", "shard1", 1), new UpdateRequest(), true);
+ assertTrue(req.shouldRetry(err));
+ }
+
+ private Error getError(Exception e) {
+ Error err = new Error();
+ err.e = e;
+ if (e instanceof SolrException) {
+ err.statusCode = ((SolrException)e).code();
+ }
+ return err;
+ }
+
+ private void testOneRetry(NodeType nodeType) throws Exception {
final HttpSolrClient solrclient = (HttpSolrClient) clients.get(0);
long numFoundBefore = solrclient.query(new SolrQuery("*:*")).getResults()
.getNumFound();
final MockStreamingSolrClients streamingClients = new MockStreamingSolrClients(updateShardHandler);
- try (SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(streamingClients, 5, 0)) {
+ try (SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(streamingClients, 0)) {
streamingClients.setExp(Exp.CONNECT_EXCEPTION);
ArrayList<Node> nodes = new ArrayList<>();
@@ -379,14 +574,26 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
final AtomicInteger retries = new AtomicInteger();
nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, solrclient.getBaseURL(), ZkStateReader.CORE_NAME_PROP, "");
- RetryNode retryNode = new RetryNode(new ZkCoreNodeProps(nodeProps), null, "collection1", "shard1") {
- @Override
- public boolean checkRetry() {
- streamingClients.setExp(null);
- retries.incrementAndGet();
- return true;
- }
- };
+ Node retryNode;
+ if (nodeType == NodeType.FORWARD) {
+ retryNode = new ForwardNode(new ZkCoreNodeProps(nodeProps), null, "collection1", "shard1", 5) {
+ @Override
+ public boolean checkRetry(Error err) {
+ streamingClients.setExp(null);
+ retries.incrementAndGet();
+ return super.checkRetry(err);
+ }
+ };
+ } else {
+ retryNode = new StdNode(new ZkCoreNodeProps(nodeProps), "collection1", "shard1", 5) {
+ @Override
+ public boolean checkRetry(Error err) {
+ streamingClients.setExp(null);
+ retries.incrementAndGet();
+ return super.checkRetry(err);
+ }
+ };
+ }
nodes.add(retryNode);
@@ -412,12 +619,70 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
}
}
- private void testRetryNodeWontRetrySocketError() throws Exception {
+ private void testNodeWontRetryBadRequest(NodeType nodeType) throws Exception {
+ ignoreException("Bad Request");
+ final HttpSolrClient solrclient = (HttpSolrClient) clients.get(0);
+ long numFoundBefore = solrclient.query(new SolrQuery("*:*")).getResults()
+ .getNumFound();
+ final MockStreamingSolrClients streamingClients = new MockStreamingSolrClients(updateShardHandler);
+ try (SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(streamingClients, 0)) {
+ streamingClients.setExp(Exp.BAD_REQUEST);
+ ArrayList<Node> nodes = new ArrayList<>();
+ ZkNodeProps nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, solrclient.getBaseURL(),
+ ZkStateReader.CORE_NAME_PROP, "");
+
+ final AtomicInteger retries = new AtomicInteger();
+ Node retryNode;
+ if (nodeType == NodeType.FORWARD) {
+ retryNode = new ForwardNode(new ZkCoreNodeProps(nodeProps), null, "collection1", "shard1", 5) {
+ @Override
+ public boolean checkRetry(Error err) {
+ retries.incrementAndGet();
+ return super.checkRetry(err);
+ }
+ };
+ } else {
+ retryNode = new StdNode(new ZkCoreNodeProps(nodeProps), "collection1", "shard1", 5) {
+ @Override
+ public boolean checkRetry(Error err) {
+ retries.incrementAndGet();
+ return super.checkRetry(err);
+ }
+ };
+ }
+ nodes.add(retryNode);
+
+ AddUpdateCommand cmd = new AddUpdateCommand(null);
+ cmd.solrDoc = sdoc("id", id.incrementAndGet());
+ ModifiableSolrParams params = new ModifiableSolrParams();
+
+ CommitUpdateCommand ccmd = new CommitUpdateCommand(null, false);
+ cmdDistrib.distribAdd(cmd, nodes, params);
+
+ streamingClients.setExp(null);
+ cmdDistrib.distribCommit(ccmd, nodes, params);
+ cmdDistrib.finish();
+
+ // it will checkRetry, but not actually do it...
+ assertEquals(1, retries.get());
+
+
+ long numFoundAfter = solrclient.query(new SolrQuery("*:*")).getResults()
+ .getNumFound();
+
+ // we will get java.net.SocketException: Network is unreachable, which we don't retry on
+ assertEquals(numFoundBefore, numFoundAfter);
+ assertEquals(1, cmdDistrib.getErrors().size());
+ unIgnoreException("Bad Request");
+ }
+ }
+
+ private void testForwardNodeWontRetrySocketError() throws Exception {
final HttpSolrClient solrclient = (HttpSolrClient) clients.get(0);
long numFoundBefore = solrclient.query(new SolrQuery("*:*")).getResults()
.getNumFound();
final MockStreamingSolrClients streamingClients = new MockStreamingSolrClients(updateShardHandler);
- try (SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(streamingClients, 5, 0)) {
+ try (SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(streamingClients, 0)) {
streamingClients.setExp(Exp.SOCKET_EXCEPTION);
ArrayList<Node> nodes = new ArrayList<>();
@@ -426,11 +691,11 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
final AtomicInteger retries = new AtomicInteger();
nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, solrclient.getBaseURL(), ZkStateReader.CORE_NAME_PROP, "");
- RetryNode retryNode = new RetryNode(new ZkCoreNodeProps(nodeProps), null, "collection1", "shard1") {
+ ForwardNode retryNode = new ForwardNode(new ZkCoreNodeProps(nodeProps), null, "collection1", "shard1", 5) {
@Override
- public boolean checkRetry() {
+ public boolean checkRetry(Error err) {
retries.incrementAndGet();
- return true;
+ return super.checkRetry(err);
}
};
@@ -460,6 +725,41 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
assertEquals(1, cmdDistrib.getErrors().size());
}
}
+
+ private void testStdNodeRetriesSocketError() throws Exception {
+ final HttpSolrClient solrclient = (HttpSolrClient) clients.get(0);
+ final MockStreamingSolrClients streamingClients = new MockStreamingSolrClients(updateShardHandler);
+ try (SolrCmdDistributor cmdDistrib = new SolrCmdDistributor(streamingClients, 0)) {
+ streamingClients.setExp(Exp.SOCKET_EXCEPTION);
+ ArrayList<Node> nodes = new ArrayList<>();
+
+ ZkNodeProps nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, solrclient.getBaseURL(),
+ ZkStateReader.CORE_NAME_PROP, "");
+
+ final AtomicInteger retries = new AtomicInteger();
+ nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, solrclient.getBaseURL(), ZkStateReader.CORE_NAME_PROP, "");
+ Node retryNode = new StdNode(new ZkCoreNodeProps(nodeProps), "collection1", "shard1", 5) {
+ @Override
+ public boolean checkRetry(Error err) {
+ retries.incrementAndGet();
+ return super.checkRetry(err);
+ }
+ };
+
+
+ nodes.add(retryNode);
+
+ AddUpdateCommand cmd = new AddUpdateCommand(null);
+ cmd.solrDoc = sdoc("id", id.incrementAndGet());
+ ModifiableSolrParams params = new ModifiableSolrParams();
+
+ cmdDistrib.distribAdd(cmd, nodes, params);
+ cmdDistrib.finish();
+
+ // it will checkRetry, but not actually do it...
+ assertEquals(6, retries.get());
+ }
+ }
private void testRetryNodeAgainstBadAddress() throws SolrServerException, IOException {
// Test RetryNode
@@ -471,14 +771,14 @@ public class SolrCmdDistributorTest extends BaseDistributedSearchTestCase {
ArrayList<Node> nodes = new ArrayList<>();
ZkNodeProps nodeProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, "[ff01::114]:33332" + context, ZkStateReader.CORE_NAME_PROP, "");
- RetryNode retryNode = new RetryNode(new ZkCoreNodeProps(nodeProps), null, "collection1", "shard1") {
+ ForwardNode retryNode = new ForwardNode(new ZkCoreNodeProps(nodeProps), null, "collection1", "shard1", 5) {
@Override
- public boolean checkRetry() {
+ public boolean checkRetry(Error err) {
ZkNodeProps leaderProps = new ZkNodeProps(ZkStateReader.BASE_URL_PROP, solrclient.getBaseURL(),
ZkStateReader.CORE_NAME_PROP, "");
this.nodeProps = new ZkCoreNodeProps(leaderProps);
- return true;
+ return super.checkRetry(err);
}
};