You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@lucene.apache.org by Robert Muir <rc...@gmail.com> on 2020/12/16 10:14:41 UTC
Re: [lucene-solr] branch branch_8x updated: SOLR-15029 Trigger leader
election on index writer tragedy
Can we please add proper javadoc and @lucene.internal tag when making
methods public, especially in classes like IndexWriter like this?
On Tue, Dec 15, 2020 at 5:04 PM <md...@apache.org> wrote:
>
> This is an automated email from the ASF dual-hosted git repository.
>
> mdrob pushed a commit to branch branch_8x
> in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
>
>
> The following commit(s) were added to refs/heads/branch_8x by this push:
> new b090971 SOLR-15029 Trigger leader election on index writer tragedy
> b090971 is described below
>
> commit b090971259f57973941d70d13612e22985a09a8d
> Author: Mike Drob <md...@apple.com>
> AuthorDate: Fri Dec 4 15:19:49 2020 -0800
>
> SOLR-15029 Trigger leader election on index writer tragedy
>
> SOLR-13027 Use TestInjection so that we always have a Tragic Event
>
> When we encounter a tragic error in the index writer, we can trigger a
> leader election instead of queing up a delete and re-add of the node in
> question. This should result in a more graceful transition, and the
> previous leader will eventually be put into recovery by a new leader.
>
> Backport removes additional logging from ShardTerms.save because we do
> not have StackWalker in Java 8.
> ---
> .../java/org/apache/lucene/index/IndexWriter.java | 7 +-
> .../java/org/apache/solr/cloud/LeaderElector.java | 11 +-
> .../org/apache/solr/cloud/RecoveryStrategy.java | 157 +++++++-----------
> .../solr/cloud/ShardLeaderElectionContext.java | 8 +-
> .../java/org/apache/solr/cloud/ZkController.java | 83 ++++++----
> .../java/org/apache/solr/cloud/ZkShardTerms.java | 76 ++++-----
> .../java/org/apache/solr/core/CoreContainer.java | 10 +-
> .../apache/solr/handler/RequestHandlerBase.java | 2 +
> .../solr/handler/admin/CollectionsHandler.java | 1 +
> .../solr/handler/admin/RebalanceLeaders.java | 2 +-
> .../org/apache/solr/servlet/ResponseUtils.java | 2 +-
> .../java/org/apache/solr/util/TestInjection.java | 39 +++++
> .../apache/solr/cloud/LeaderTragicEventTest.java | 177 +++++++++------------
> .../test/org/apache/solr/cloud/ShardTermsTest.java | 48 ++++++
> .../org/apache/solr/cloud/ZkShardTermsTest.java | 8 -
> .../apache/solr/client/solrj/cloud/ShardTerms.java | 19 ++-
> 16 files changed, 338 insertions(+), 312 deletions(-)
>
> diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
> index 06ab80e..5e3182a 100644
> --- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
> +++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
> @@ -5172,11 +5172,12 @@ public class IndexWriter implements Closeable, TwoPhaseCommit, Accountable,
> }
>
> /**
> - * This method should be called on a tragic event ie. if a downstream class of the writer
> + * <p>This method should be called on a tragic event ie. if a downstream class of the writer
> * hits an unrecoverable exception. This method does not rethrow the tragic event exception.
> - * Note: This method will not close the writer but can be called from any location without respecting any lock order
> + * <p>Note: This method will not close the writer but can be called from any location without respecting any lock order
> + * <p>This method is visible for testing, and is not expected to be called by client code
> */
> - private void onTragicEvent(Throwable tragedy, String location) {
> + public void onTragicEvent(Throwable tragedy, String location) {
> // This is not supposed to be tragic: IW is supposed to catch this and
> // ignore, because it means we asked the merge to abort:
> assert tragedy instanceof MergePolicy.MergeAbortedException == false;
> diff --git a/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java b/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
> index f50aa11..e55ce2b 100644
> --- a/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
> +++ b/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
> @@ -18,10 +18,11 @@ package org.apache.solr.cloud;
>
> import java.io.IOException;
> import java.lang.invoke.MethodHandles;
> -import java.util.Collections;
> +import java.util.Comparator;
> import java.util.Iterator;
> import java.util.List;
> import java.util.Map;
> +import java.util.function.Function;
> import java.util.regex.Matcher;
> import java.util.regex.Pattern;
>
> @@ -42,7 +43,7 @@ import org.slf4j.LoggerFactory;
>
> /**
> * Leader Election process. This class contains the logic by which a
> - * leader is chosen. First call * {@link #setup(ElectionContext)} to ensure
> + * leader is chosen. First call {@link #setup(ElectionContext)} to ensure
> * the election process is init'd. Next call
> * {@link #joinElection(ElectionContext, boolean)} to start the leader election.
> *
> @@ -166,7 +167,6 @@ public class LeaderElector {
> }
> }
>
> - // TODO: get this core param out of here
> protected void runIamLeaderProcess(final ElectionContext context, boolean weAreReplacement) throws KeeperException,
> InterruptedException, IOException {
> context.runLeaderProcess(weAreReplacement,0);
> @@ -378,10 +378,7 @@ public class LeaderElector {
> * Sort n string sequence list.
> */
> public static void sortSeqs(List<String> seqs) {
> - Collections.sort(seqs, (o1, o2) -> {
> - int i = getSeq(o1) - getSeq(o2);
> - return i == 0 ? o1.compareTo(o2) : i;
> - });
> + seqs.sort(Comparator.comparingInt(LeaderElector::getSeq).thenComparing(Function.identity()));
> }
>
> void retryElection(ElectionContext context, boolean joinAtHead) throws KeeperException, InterruptedException, IOException {
> diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
> index f70276c..1366970 100644
> --- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
> +++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
> @@ -197,9 +197,8 @@ public class RecoveryStrategy implements Runnable, Closeable {
> log.warn("Stopping recovery for core=[{}] coreNodeName=[{}]", coreName, coreZkNodeName);
> }
>
> - final private void recoveryFailed(final SolrCore core,
> - final ZkController zkController, final String baseUrl,
> - final String shardZkNodeName, final CoreDescriptor cd) throws Exception {
> + final private void recoveryFailed(final ZkController zkController,
> + final CoreDescriptor cd) throws Exception {
> SolrException.log(log, "Recovery failed - I give up.");
> try {
> zkController.publish(cd, Replica.State.RECOVERY_FAILED);
> @@ -423,63 +422,64 @@ public class RecoveryStrategy implements Runnable, Closeable {
> }
>
> if (!successfulRecovery) {
> - // lets pause for a moment and we need to try again...
> - // TODO: we don't want to retry for some problems?
> - // Or do a fall off retry...
> - try {
> + if (waitBetweenRecoveries(core.getName())) break;
> + }
> + }
> + // We skip core.seedVersionBuckets(); We don't have a transaction log
> + log.info("Finished recovery process, successful=[{}]", successfulRecovery);
> + }
>
> - if (isClosed()) {
> - if (log.isInfoEnabled()) {
> - log.info("Recovery for core {} has been closed", core.getName());
> - }
> - break;
> - }
> + /**
> + * @return true if we have reached max attempts or should stop recovering for some other reason
> + */
> + private boolean waitBetweenRecoveries(String coreName) {
> + // lets pause for a moment and we need to try again...
> + // TODO: we don't want to retry for some problems?
> + // Or do a fall off retry...
> + try {
> + if (isClosed()) {
> + log.info("Recovery for core {} has been closed", coreName);
> + return true;
> + }
>
> - log.error("Recovery failed - trying again... ({})", retries);
> + log.error("Recovery failed - trying again... ({})", retries);
>
> - retries++;
> - if (retries >= maxRetries) {
> - SolrException.log(log, "Recovery failed - max retries exceeded (" + retries + ").");
> - try {
> - recoveryFailed(core, zkController, baseUrl, coreZkNodeName, this.coreDescriptor);
> - } catch (Exception e) {
> - SolrException.log(log, "Could not publish that recovery failed", e);
> - }
> - break;
> - }
> + retries++;
> + if (retries >= maxRetries) {
> + SolrException.log(log, "Recovery failed - max retries exceeded (" + retries + ").");
> + try {
> + recoveryFailed(zkController, this.coreDescriptor);
> } catch (Exception e) {
> - SolrException.log(log, "An error has occurred during recovery", e);
> + SolrException.log(log, "Could not publish that recovery failed", e);
> }
> + return true;
> + }
> + } catch (Exception e) {
> + SolrException.log(log, "An error has occurred during recovery", e);
> + }
>
> - try {
> - // Wait an exponential interval between retries, start at 5 seconds and work up to a minute.
> - // If we're at attempt >= 4, there's no point computing pow(2, retries) because the result
> - // will always be the minimum of the two (12). Since we sleep at 5 seconds sub-intervals in
> - // order to check if we were closed, 12 is chosen as the maximum loopCount (5s * 12 = 1m).
> - int loopCount = retries < 4 ? (int) Math.min(Math.pow(2, retries), 12) : 12;
> - if (log.isInfoEnabled()) {
> - log.info("Wait [{}] seconds before trying to recover again (attempt={})",
> + try {
> + // Wait an exponential interval between retries, start at 4 seconds and work up to a minute.
> + // Meanwhile we will check in 2s sub-intervals to see if we've been closed
> + // Maximum loop count is 30 because we never want to wait longer than a minute (2s * 30 = 1m)
> + int loopCount = retries < 5 ? (int) Math.pow(2, retries) : 30;
> + if (log.isInfoEnabled()) {
> + log.info("Wait [{}] seconds before trying to recover again (attempt={})",
> TimeUnit.MILLISECONDS.toSeconds(loopCount * startingRecoveryDelayMilliSeconds), retries);
> - }
> - for (int i = 0; i < loopCount; i++) {
> - if (isClosed()) {
> - if (log.isInfoEnabled()) {
> - log.info("Recovery for core {} has been closed", core.getName());
> - }
> - break; // check if someone closed us
> - }
> - Thread.sleep(startingRecoveryDelayMilliSeconds);
> - }
> - } catch (InterruptedException e) {
> - Thread.currentThread().interrupt();
> - log.warn("Recovery was interrupted.", e);
> - close = true;
> + }
> + for (int i = 0; i < loopCount; i++) {
> + if (isClosed()) {
> + log.info("Recovery for core {} has been closed", coreName);
> + break; // check if someone closed us
> }
> + Thread.sleep(startingRecoveryDelayMilliSeconds);
> }
> -
> + } catch (InterruptedException e) {
> + Thread.currentThread().interrupt();
> + log.warn("Recovery was interrupted.", e);
> + close = true;
> }
> - // We skip core.seedVersionBuckets(); We don't have a transaction log
> - log.info("Finished recovery process, successful=[{}]", successfulRecovery);
> + return false;
> }
>
> // TODO: perhaps make this grab a new core each time through the loop to handle core reloads?
> @@ -490,8 +490,8 @@ public class RecoveryStrategy implements Runnable, Closeable {
> ulog = core.getUpdateHandler().getUpdateLog();
> if (ulog == null) {
> SolrException.log(log, "No UpdateLog found - cannot recover.");
> - recoveryFailed(core, zkController, baseUrl, coreZkNodeName,
> - this.coreDescriptor);
> + recoveryFailed(zkController,
> + this.coreDescriptor);
> return;
> }
>
> @@ -709,7 +709,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
> }
> zkController.publish(this.coreDescriptor, Replica.State.ACTIVE);
> } catch (Exception e) {
> - log.error("Could not publish as ACTIVE after succesful recovery", e);
> + log.error("Could not publish as ACTIVE after successful recovery", e);
> successfulRecovery = false;
> }
>
> @@ -721,53 +721,8 @@ public class RecoveryStrategy implements Runnable, Closeable {
> }
>
> if (!successfulRecovery) {
> - // lets pause for a moment and we need to try again...
> - // TODO: we don't want to retry for some problems?
> - // Or do a fall off retry...
> - try {
> -
> - if (isClosed()) {
> - log.info("RecoveryStrategy has been closed");
> - break;
> - }
> -
> - log.error("Recovery failed - trying again... ({})", retries);
> -
> - retries++;
> - if (retries >= maxRetries) {
> - SolrException.log(log, "Recovery failed - max retries exceeded (" + retries + ").");
> - try {
> - recoveryFailed(core, zkController, baseUrl, coreZkNodeName, this.coreDescriptor);
> - } catch (Exception e) {
> - SolrException.log(log, "Could not publish that recovery failed", e);
> - }
> - break;
> - }
> - } catch (Exception e) {
> - SolrException.log(log, "An error has occurred during recovery", e);
> - }
> -
> - try {
> - // Wait an exponential interval between retries, start at 2 seconds and work up to a minute.
> - // Since we sleep at 2 seconds sub-intervals in
> - // order to check if we were closed, 30 is chosen as the maximum loopCount (2s * 30 = 1m).
> - double loopCount = Math.min(Math.pow(2, retries - 1), 30);
> - log.info("Wait [{}] seconds before trying to recover again (attempt={})",
> - loopCount * startingRecoveryDelayMilliSeconds, retries);
> - for (int i = 0; i < loopCount; i++) {
> - if (isClosed()) {
> - log.info("RecoveryStrategy has been closed");
> - break; // check if someone closed us
> - }
> - Thread.sleep(startingRecoveryDelayMilliSeconds);
> - }
> - } catch (InterruptedException e) {
> - Thread.currentThread().interrupt();
> - log.warn("Recovery was interrupted.", e);
> - close = true;
> - }
> + if (waitBetweenRecoveries(core.getName())) break;
> }
> -
> }
>
> // if replay was skipped (possibly to due pulling a full index from the leader),
> @@ -780,6 +735,10 @@ public class RecoveryStrategy implements Runnable, Closeable {
> log.info("Finished recovery process, successful=[{}]", successfulRecovery);
> }
>
> + /**
> + * Make sure we can connect to the shard leader as currently defined in ZK
> + * @param ourUrl if the leader url is the same as our url, we will skip trying to connect
> + */
> private final Replica pingLeader(String ourUrl, CoreDescriptor coreDesc, boolean mayPutReplicaAsDown)
> throws Exception {
> int numTried = 0;
> diff --git a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java
> index 69ccf75..68b062e 100644
> --- a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java
> +++ b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java
> @@ -46,7 +46,6 @@ import org.apache.zookeeper.KeeperException.SessionExpiredException;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
>
> -// add core container and stop passing core around...
> final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
> private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
>
> @@ -120,11 +119,10 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
> zkController.getOverseer().getStateUpdateQueue().offer(Utils.toJSON(m));
> }
>
> - boolean allReplicasInLine = false;
> if (!weAreReplacement) {
> - allReplicasInLine = waitForReplicasToComeUp(leaderVoteWait);
> + waitForReplicasToComeUp(leaderVoteWait);
> } else {
> - allReplicasInLine = areAllReplicasParticipating();
> + areAllReplicasParticipating();
> }
>
> if (isClosed) {
> @@ -237,7 +235,6 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
>
> }
>
> - boolean isLeader = true;
> if (!isClosed) {
> try {
> if (replicaType == Replica.Type.TLOG) {
> @@ -281,7 +278,6 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
> throw new SolrException(ErrorCode.SERVER_ERROR,
> "ZK session expired - cancelling election for " + collection + " " + shardId);
> } catch (Exception e) {
> - isLeader = false;
> SolrException.log(log, "There was a problem trying to register as the leader", e);
>
> try (SolrCore core = cc.getCore(coreName)) {
> diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
> index d71adb1..6cc2afe 100644
> --- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
> +++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
> @@ -64,11 +64,33 @@ import org.apache.solr.cloud.overseer.SliceMutator;
> import org.apache.solr.common.AlreadyClosedException;
> import org.apache.solr.common.SolrException;
> import org.apache.solr.common.SolrException.ErrorCode;
> -import org.apache.solr.common.cloud.*;
> +import org.apache.solr.common.cloud.BeforeReconnect;
> +import org.apache.solr.common.cloud.ClusterState;
> +import org.apache.solr.common.cloud.ConnectionManager;
> +import org.apache.solr.common.cloud.DefaultConnectionStrategy;
> +import org.apache.solr.common.cloud.DefaultZkACLProvider;
> +import org.apache.solr.common.cloud.DefaultZkCredentialsProvider;
> +import org.apache.solr.common.cloud.DocCollection;
> +import org.apache.solr.common.cloud.DocCollectionWatcher;
> +import org.apache.solr.common.cloud.LiveNodesListener;
> +import org.apache.solr.common.cloud.NodesSysPropsCacher;
> +import org.apache.solr.common.cloud.OnReconnect;
> +import org.apache.solr.common.cloud.Replica;
> import org.apache.solr.common.cloud.Replica.Type;
> +import org.apache.solr.common.cloud.Slice;
> +import org.apache.solr.common.cloud.SolrZkClient;
> +import org.apache.solr.common.cloud.UrlScheme;
> +import org.apache.solr.common.cloud.ZkACLProvider;
> +import org.apache.solr.common.cloud.ZkCmdExecutor;
> +import org.apache.solr.common.cloud.ZkConfigManager;
> +import org.apache.solr.common.cloud.ZkCoreNodeProps;
> +import org.apache.solr.common.cloud.ZkCredentialsProvider;
> +import org.apache.solr.common.cloud.ZkMaintenanceUtils;
> +import org.apache.solr.common.cloud.ZkNodeProps;
> +import org.apache.solr.common.cloud.ZkStateReader;
> +import org.apache.solr.common.cloud.ZooKeeperException;
> import org.apache.solr.common.params.CollectionParams;
> import org.apache.solr.common.params.CommonParams;
> -import org.apache.solr.common.params.CoreAdminParams;
> import org.apache.solr.common.params.SolrParams;
> import org.apache.solr.common.util.ExecutorUtil;
> import org.apache.solr.common.util.IOUtils;
> @@ -109,7 +131,6 @@ import static org.apache.solr.common.cloud.ZkStateReader.CORE_NODE_NAME_PROP;
> import static org.apache.solr.common.cloud.ZkStateReader.ELECTION_NODE_PROP;
> import static org.apache.solr.common.cloud.ZkStateReader.NODE_NAME_PROP;
> import static org.apache.solr.common.cloud.ZkStateReader.REJOIN_AT_HEAD_PROP;
> -import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
> import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
>
> /**
> @@ -174,6 +195,11 @@ public class ZkController implements Closeable {
> } else if (!coreNodeName.equals(other.coreNodeName)) return false;
> return true;
> }
> +
> + @Override
> + public String toString() {
> + return collection + ':' + coreNodeName;
> + }
> }
>
> private final Map<ContextKey, ElectionContext> electionContexts = Collections.synchronizedMap(new HashMap<>());
> @@ -652,55 +678,46 @@ public class ZkController implements Closeable {
> /**
> * Best effort to give up the leadership of a shard in a core after hitting a tragic exception
> * @param cd The current core descriptor
> - * @param tragicException The tragic exception from the {@code IndexWriter}
> */
> - public void giveupLeadership(CoreDescriptor cd, Throwable tragicException) {
> - assert tragicException != null;
> + public void giveupLeadership(CoreDescriptor cd) {
> assert cd != null;
> - DocCollection dc = getClusterState().getCollectionOrNull(cd.getCollectionName());
> +
> + String collection = cd.getCollectionName();
> + if (collection == null) return;
> +
> + DocCollection dc = getClusterState().getCollectionOrNull(collection);
> if (dc == null) return;
>
> Slice shard = dc.getSlice(cd.getCloudDescriptor().getShardId());
> if (shard == null) return;
>
> // if this replica is not a leader, it will be put in recovery state by the leader
> - if (shard.getReplica(cd.getCloudDescriptor().getCoreNodeName()) != shard.getLeader()) return;
> + String leader = cd.getCloudDescriptor().getCoreNodeName();
> + if (shard.getReplica(leader) != shard.getLeader()) return;
>
> + Set<String> liveNodes = getClusterState().getLiveNodes();
> int numActiveReplicas = shard.getReplicas(
> rep -> rep.getState() == Replica.State.ACTIVE
> && rep.getType() != Type.PULL
> - && getClusterState().getLiveNodes().contains(rep.getNodeName())
> + && liveNodes.contains(rep.getNodeName())
> ).size();
>
> // at least the leader still be able to search, we should give up leadership if other replicas can take over
> if (numActiveReplicas >= 2) {
> - String key = cd.getCollectionName() + ":" + cd.getCloudDescriptor().getCoreNodeName();
> - //TODO better handling the case when delete replica was failed
> - if (replicasMetTragicEvent.putIfAbsent(key, tragicException) == null) {
> - log.warn("Leader {} met tragic exception, give up its leadership", key, tragicException);
> + ContextKey key = new ContextKey(collection, leader);
> + ElectionContext context = electionContexts.get(key);
> + if (context instanceof ShardLeaderElectionContextBase) {
> + LeaderElector elector = ((ShardLeaderElectionContextBase) context).getLeaderElector();
> try {
> - // by using Overseer to remove and add replica back, we can do the task in an async/robust manner
> - Map<String,Object> props = new HashMap<>();
> - props.put(Overseer.QUEUE_OPERATION, "deletereplica");
> - props.put(COLLECTION_PROP, cd.getCollectionName());
> - props.put(SHARD_ID_PROP, shard.getName());
> - props.put(REPLICA_PROP, cd.getCloudDescriptor().getCoreNodeName());
> - getOverseerCollectionQueue().offer(Utils.toJSON(new ZkNodeProps(props)));
> -
> - props.clear();
> - props.put(Overseer.QUEUE_OPERATION, "addreplica");
> - props.put(COLLECTION_PROP, cd.getCollectionName());
> - props.put(SHARD_ID_PROP, shard.getName());
> - props.put(ZkStateReader.REPLICA_TYPE, cd.getCloudDescriptor().getReplicaType().name().toUpperCase(Locale.ROOT));
> - props.put(CoreAdminParams.NODE, getNodeName());
> - getOverseerCollectionQueue().offer(Utils.toJSON(new ZkNodeProps(props)));
> - } catch (Exception e) {
> - // Exceptions are not bubbled up. giveupLeadership is best effort, and is only called in case of some other
> - // unrecoverable error happened
> - log.error("Met exception on give up leadership for {}", key, e);
> - replicasMetTragicEvent.remove(key);
> + log.warn("Leader {} met tragic exception, give up its leadership", key);
> + elector.retryElection(context, false);
> + } catch (KeeperException | InterruptedException | IOException e) {
> SolrZkClient.checkInterrupted(e);
> + log.error("Met exception on give up leadership for {}", key, e);
> }
> + } else {
> + // The node is probably already gone
> + log.warn("Could not get election context {} to give up leadership", key);
> }
> }
> }
> diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
> index cc33205..6c38797 100644
> --- a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
> +++ b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
> @@ -25,6 +25,7 @@ import java.util.Set;
> import java.util.concurrent.TimeoutException;
> import java.util.concurrent.atomic.AtomicBoolean;
> import java.util.concurrent.atomic.AtomicReference;
> +import java.util.function.Function;
>
> import org.apache.solr.client.solrj.cloud.ShardTerms;
> import org.apache.solr.common.SolrException;
> @@ -73,7 +74,7 @@ public class ZkShardTerms implements AutoCloseable{
> private final Set<CoreTermWatcher> listeners = new HashSet<>();
> private final AtomicBoolean isClosed = new AtomicBoolean(false);
>
> - private AtomicReference<ShardTerms> terms = new AtomicReference<>();
> + private final AtomicReference<ShardTerms> terms = new AtomicReference<>();
>
> /**
> * Listener of a core for shard's term change events
> @@ -106,17 +107,15 @@ public class ZkShardTerms implements AutoCloseable{
> }
>
> /**
> - * Ensure that leader's term is higher than some replica's terms
> + * Ensure that terms are higher than some replica's terms. If the current leader is attempting to give up
> + * leadership and included in replicasNeedingRecovery, then other replicas that are in sync will have higher
> + * terms, while the leader will stay where it is.
> * @param leader coreNodeName of leader
> * @param replicasNeedingRecovery set of replicas in which their terms should be lower than leader's term
> */
> public void ensureTermsIsHigher(String leader, Set<String> replicasNeedingRecovery) {
> if (replicasNeedingRecovery.isEmpty()) return;
> -
> - ShardTerms newTerms;
> - while( (newTerms = terms.get().increaseTerms(leader, replicasNeedingRecovery)) != null) {
> - if (forceSaveTerms(newTerms)) return;
> - }
> + mutate(terms -> terms.increaseTerms(leader, replicasNeedingRecovery));
> }
>
> public ShardTerms getShardTerms() {
> @@ -206,10 +205,7 @@ public class ZkShardTerms implements AutoCloseable{
> * @param coreNodeName of the replica
> */
> void registerTerm(String coreNodeName) {
> - ShardTerms newTerms;
> - while ( (newTerms = terms.get().registerTerm(coreNodeName)) != null) {
> - if (forceSaveTerms(newTerms)) break;
> - }
> + mutate(terms -> terms.registerTerm(coreNodeName));
> }
>
> /**
> @@ -218,37 +214,29 @@ public class ZkShardTerms implements AutoCloseable{
> * @param coreNodeName of the replica
> */
> public void setTermEqualsToLeader(String coreNodeName) {
> - ShardTerms newTerms;
> - while ( (newTerms = terms.get().setTermEqualsToLeader(coreNodeName)) != null) {
> - if (forceSaveTerms(newTerms)) break;
> - }
> + mutate(terms -> terms.setTermEqualsToLeader(coreNodeName));
> }
>
> + /**
> + * Set a replica's term to 0. If the term does not exist, create it.
> + * @param coreNodeName of the replica
> + */
> public void setTermToZero(String coreNodeName) {
> - ShardTerms newTerms;
> - while ( (newTerms = terms.get().setTermToZero(coreNodeName)) != null) {
> - if (forceSaveTerms(newTerms)) break;
> - }
> + mutate(terms -> terms.setTermToZero(coreNodeName));
> }
>
> /**
> * Mark {@code coreNodeName} as recovering
> */
> public void startRecovering(String coreNodeName) {
> - ShardTerms newTerms;
> - while ( (newTerms = terms.get().startRecovering(coreNodeName)) != null) {
> - if (forceSaveTerms(newTerms)) break;
> - }
> + mutate(terms -> terms.startRecovering(coreNodeName));
> }
>
> /**
> * Mark {@code coreNodeName} as finished recovering
> */
> public void doneRecovering(String coreNodeName) {
> - ShardTerms newTerms;
> - while ( (newTerms = terms.get().doneRecovering(coreNodeName)) != null) {
> - if (forceSaveTerms(newTerms)) break;
> - }
> + mutate(terms -> terms.doneRecovering(coreNodeName));
> }
>
> public boolean isRecovering(String name) {
> @@ -260,8 +248,17 @@ public class ZkShardTerms implements AutoCloseable{
> * so we must switch from term 0 (registered) to 1 (have some data)
> */
> public void ensureHighestTermsAreNotZero() {
> + mutate(ShardTerms::ensureHighestTermsAreNotZero);
> + }
> +
> + /**
> + * Attempt to apply an action and save the results, retrying as necessary.
> + * If action returns null, then we are done and will not make additional retires.
> + * @param action The mutation to apply to current shard terms before saving
> + */
> + private void mutate(Function<ShardTerms, ShardTerms> action) {
> ShardTerms newTerms;
> - while ( (newTerms = terms.get().ensureHighestTermsAreNotZero()) != null) {
> + while ((newTerms = action.apply(terms.get())) != null) {
> if (forceSaveTerms(newTerms)) break;
> }
> }
> @@ -315,7 +312,8 @@ public class ZkShardTerms implements AutoCloseable{
> refreshTerms();
> } catch (KeeperException.NoNodeException e) {
> throw e;
> - } catch (Exception e) {
> + } catch (RuntimeException | KeeperException | InterruptedException e) {
> + SolrZkClient.checkInterrupted(e);
> throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error while saving shard term for collection: " + collection, e);
> }
> return false;
> @@ -336,13 +334,10 @@ public class ZkShardTerms implements AutoCloseable{
> // it's okay if another beats us creating the node
> }
>
> - } catch (InterruptedException e) {
> - Thread.interrupted();
> + } catch (KeeperException | InterruptedException e) {
> + Throwable cause = SolrZkClient.checkInterrupted(e);
> throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
> - "Error creating shard term node in Zookeeper for collection: " + collection, e);
> - } catch (KeeperException e) {
> - throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
> - "Error creating shard term node in Zookeeper for collection: " + collection, e);
> + "Error creating shard term node in Zookeeper for collection: " + collection, cause);
> }
> }
>
> @@ -356,11 +351,10 @@ public class ZkShardTerms implements AutoCloseable{
> Stat stat = new Stat();
> byte[] data = zkClient.getData(znodePath, null, stat, true);
> newTerms = new ShardTerms((Map<String, Long>) Utils.fromJSON(data), stat.getVersion());
> - } catch (KeeperException e) {
> - Thread.interrupted();
> - throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error updating shard term for collection: " + collection, e);
> - } catch (InterruptedException e) {
> - throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error updating shard term for collection: " + collection, e);
> + } catch (KeeperException | InterruptedException e) {
> + Throwable cause = SolrZkClient.checkInterrupted(e);
> + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
> + "Error updating shard term for collection: " + collection, cause);
> }
>
> setNewTerms(newTerms);
> @@ -408,7 +402,7 @@ public class ZkShardTerms implements AutoCloseable{
> // exists operation is faster than getData operation
> zkClient.exists(znodePath, watcher, true);
> } catch (InterruptedException e) {
> - Thread.interrupted();
> + Thread.currentThread().interrupt();
> throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error watching shard term for collection: " + collection, e);
> }
> }
> diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
> index c3fb644..6022f83 100644
> --- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
> +++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
> @@ -2207,7 +2207,15 @@ public class CoreContainer {
> }
>
> if (tragicException != null && isZooKeeperAware()) {
> - getZkController().giveupLeadership(solrCore.getCoreDescriptor(), tragicException);
> + getZkController().giveupLeadership(solrCore.getCoreDescriptor());
> +
> + try {
> + // If the error was something like a full file system disconnect, this probably won't help
> + // But if it is a transient disk failure then it's worth a try
> + solrCore.getSolrCoreState().newIndexWriter(solrCore, false); // should we rollback?
> + } catch (IOException e) {
> + log.warn("Could not roll index writer after tragedy");
> + }
> }
>
> return tragicException != null;
> diff --git a/solr/core/src/java/org/apache/solr/handler/RequestHandlerBase.java b/solr/core/src/java/org/apache/solr/handler/RequestHandlerBase.java
> index c2dbd0e..080a696 100644
> --- a/solr/core/src/java/org/apache/solr/handler/RequestHandlerBase.java
> +++ b/solr/core/src/java/org/apache/solr/handler/RequestHandlerBase.java
> @@ -46,6 +46,7 @@ import org.apache.solr.request.SolrRequestHandler;
> import org.apache.solr.response.SolrQueryResponse;
> import org.apache.solr.search.SyntaxError;
> import org.apache.solr.util.SolrPluginUtils;
> +import org.apache.solr.util.TestInjection;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
>
> @@ -206,6 +207,7 @@ public abstract class RequestHandlerBase implements SolrRequestHandler, SolrInfo
> @SuppressWarnings("resource")
> Timer.Context dTimer = distrib ? distribRequestTimes.time() : localRequestTimes.time();
> try {
> + TestInjection.injectLeaderTragedy(req.getCore());
> if (pluginInfo != null && pluginInfo.attributes.containsKey(USEPARAM))
> req.getContext().put(USEPARAM, pluginInfo.attributes.get(USEPARAM));
> SolrPluginUtils.setDefaults(this, req, defaults, appends, invariants);
> diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
> index f4fb7e0..0832126 100644
> --- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
> +++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
> @@ -1370,6 +1370,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
> //TODO only increase terms of replicas less out-of-sync
> liveReplicas.stream()
> .filter(rep -> zkShardTerms.registered(rep.getName()))
> + // TODO should this all be done at once instead of increasing each replica individually?
> .forEach(rep -> zkShardTerms.setTermEqualsToLeader(rep.getName()));
> }
>
> diff --git a/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java b/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java
> index 8f3fdb2..239fbec 100644
> --- a/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java
> +++ b/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java
> @@ -318,7 +318,7 @@ class RebalanceLeaders {
>
> // Put the replica in at the head of the queue and send all nodes with the same sequence number to the back of the list
> // There can be "ties", i.e. replicas in the queue with the same sequence number. Sorting doesn't necessarily sort
> - // the one we most care about first. So put the node we _don't care about at the end of the election queuel
> + // the one we most care about first. So put the node we _don't care about at the end of the election queue_
>
> void makeReplicaFirstWatcher(Slice slice, Replica replica)
> throws KeeperException, InterruptedException {
> diff --git a/solr/core/src/java/org/apache/solr/servlet/ResponseUtils.java b/solr/core/src/java/org/apache/solr/servlet/ResponseUtils.java
> index 8a5f2eb..b7df6fe 100644
> --- a/solr/core/src/java/org/apache/solr/servlet/ResponseUtils.java
> +++ b/solr/core/src/java/org/apache/solr/servlet/ResponseUtils.java
> @@ -69,7 +69,7 @@ public class ResponseUtils {
> if (code == 500 || code < 100) {
> StringWriter sw = new StringWriter();
> ex.printStackTrace(new PrintWriter(sw));
> - SolrException.log(log, null, ex);
> + SolrException.log(log, ex);
> info.add("trace", sw.toString());
>
> // non standard codes have undefined results with various servers
> diff --git a/solr/core/src/java/org/apache/solr/util/TestInjection.java b/solr/core/src/java/org/apache/solr/util/TestInjection.java
> index 3ae2349..edee292 100644
> --- a/solr/core/src/java/org/apache/solr/util/TestInjection.java
> +++ b/solr/core/src/java/org/apache/solr/util/TestInjection.java
> @@ -16,6 +16,7 @@
> */
> package org.apache.solr.util;
>
> +import java.io.IOException;
> import java.lang.invoke.MethodHandles;
> import java.lang.reflect.Method;
> import java.util.Collections;
> @@ -31,11 +32,13 @@ import java.util.concurrent.atomic.AtomicInteger;
> import java.util.regex.Matcher;
> import java.util.regex.Pattern;
>
> +import org.apache.lucene.index.IndexWriter;
> import org.apache.solr.common.NonExistentCoreException;
> import org.apache.solr.common.SolrException;
> import org.apache.solr.common.SolrException.ErrorCode;
> import org.apache.solr.common.util.Pair;
> import org.apache.solr.core.CoreContainer;
> +import org.apache.solr.core.SolrCore;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
>
> @@ -111,6 +114,8 @@ public class TestInjection {
>
> public volatile static String failUpdateRequests = null;
>
> + public volatile static String leaderTragedy = null;
> +
> public volatile static String nonExistentCoreExceptionAfterUnload = null;
>
> public volatile static String updateLogReplayRandomPause = null;
> @@ -171,6 +176,7 @@ public class TestInjection {
> nonGracefullClose = null;
> failReplicaRequests = null;
> failUpdateRequests = null;
> + leaderTragedy = null;
> nonExistentCoreExceptionAfterUnload = null;
> updateLogReplayRandomPause = null;
> updateRandomPause = null;
> @@ -337,6 +343,39 @@ public class TestInjection {
>
> return true;
> }
> +
> + public static boolean injectLeaderTragedy(SolrCore core) {
> + if (leaderTragedy != null) {
> + Random rand = random();
> + if (null == rand) return true;
> +
> + Pair<Boolean, Integer> pair = parseValue(leaderTragedy);
> + boolean enabled = pair.first();
> + int chanceIn100 = pair.second();
> +
> + if (! core.getCoreDescriptor().getCloudDescriptor().isLeader()) {
> + return true;
> + }
> +
> + if (enabled && rand.nextInt(100) >= (100 - chanceIn100)) {
> + RefCounted<IndexWriter> writer = null;
> + try {
> + writer = core.getSolrCoreState().getIndexWriter(null);
> + writer.get().onTragicEvent(new Exception("injected tragedy"), "injection");
> + } catch (IOException e) {
> + // Problem getting the writer, but that will likely bubble up later
> + return true;
> + } finally {
> + if (writer != null) {
> + writer.decref();
> + }
> + }
> +
> + throw new SolrException(ErrorCode.SERVER_ERROR, "Random tragedy fail");
> + }
> + }
> + return true;
> + }
>
> public static boolean injectNonExistentCoreExceptionAfterUnload(String cname) {
> if (nonExistentCoreExceptionAfterUnload != null) {
> diff --git a/solr/core/src/test/org/apache/solr/cloud/LeaderTragicEventTest.java b/solr/core/src/test/org/apache/solr/cloud/LeaderTragicEventTest.java
> index 2be7add..f15ad47 100644
> --- a/solr/core/src/test/org/apache/solr/cloud/LeaderTragicEventTest.java
> +++ b/solr/core/src/test/org/apache/solr/cloud/LeaderTragicEventTest.java
> @@ -17,145 +17,120 @@
>
> package org.apache.solr.cloud;
>
> -import static org.hamcrest.CoreMatchers.anyOf;
> -import static org.hamcrest.CoreMatchers.is;
> -
> -import java.io.FileNotFoundException;
> -import java.io.IOException;
> -import java.lang.invoke.MethodHandles;
> -import java.nio.file.NoSuchFileException;
> -import java.util.ArrayList;
> -import java.util.Collections;
> -import java.util.List;
> import org.apache.lucene.store.AlreadyClosedException;
> -import org.apache.lucene.store.MockDirectoryWrapper;
> -import org.apache.lucene.util.LuceneTestCase.AwaitsFix;
> +import org.apache.solr.client.solrj.SolrClient;
> +import org.apache.solr.client.solrj.SolrQuery;
> +import org.apache.solr.client.solrj.SolrServerException;
> import org.apache.solr.client.solrj.embedded.JettySolrRunner;
> +import org.apache.solr.client.solrj.impl.BaseHttpSolrClient.RemoteSolrException;
> import org.apache.solr.client.solrj.impl.HttpSolrClient;
> -import org.apache.solr.client.solrj.impl.HttpSolrClient.RemoteSolrException;
> import org.apache.solr.client.solrj.request.CollectionAdminRequest;
> +import org.apache.solr.client.solrj.request.QueryRequest;
> import org.apache.solr.client.solrj.request.UpdateRequest;
> -import org.apache.solr.common.SolrException;
> +import org.apache.solr.client.solrj.response.QueryResponse;
> +import org.apache.solr.client.solrj.response.UpdateResponse;
> import org.apache.solr.common.cloud.ClusterStateUtil;
> import org.apache.solr.common.cloud.DocCollection;
> import org.apache.solr.common.cloud.Replica;
> import org.apache.solr.common.cloud.Slice;
> -import org.apache.solr.core.CoreContainer;
> -import org.apache.solr.core.DirectoryFactory;
> -import org.apache.solr.core.MockDirectoryFactory;
> -import org.apache.solr.core.SolrCore;
> -import org.junit.AfterClass;
> +import org.apache.solr.util.TestInjection;
> +import org.hamcrest.MatcherAssert;
> +import org.junit.After;
> +import org.junit.Before;
> import org.junit.BeforeClass;
> import org.junit.Test;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
>
> -@AwaitsFix(bugUrl="https://issues.apache.org/jira/browse/SOLR-13237")
> -public class LeaderTragicEventTest extends SolrCloudTestCase {
> +import java.io.IOException;
> +import java.lang.invoke.MethodHandles;
>
> +import static org.hamcrest.CoreMatchers.anyOf;
> +import static org.hamcrest.CoreMatchers.is;
> +
> +public class LeaderTragicEventTest extends SolrCloudTestCase {
> private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
>
> + private String collection;
> +
> @BeforeClass
> public static void setupCluster() throws Exception {
> - System.setProperty("solr.mscheduler", "org.apache.solr.core.MockConcurrentMergeScheduler");
> - System.setProperty(MockDirectoryFactory.SOLR_TESTS_USING_MOCK_DIRECTORY_WRAPPER, "true");
> -
> configureCluster(2)
> .addConfig("config", TEST_PATH().resolve("configsets").resolve("cloud-minimal").resolve("conf"))
> .configure();
> }
>
> - @AfterClass
> - public static void cleanup() {
> - System.clearProperty("solr.mscheduler");
> - System.clearProperty(MockDirectoryFactory.SOLR_TESTS_USING_MOCK_DIRECTORY_WRAPPER);
> + @Before
> + public void setUp() throws Exception {
> + super.setUp();
> + collection = getSaferTestName();
> + cluster.getSolrClient().setDefaultCollection(collection);
> }
>
> + @After
> + public void tearDown() throws Exception {
> + super.tearDown();
> + CollectionAdminRequest.deleteCollection(collection).process(cluster.getSolrClient());
> + }
>
> @Test
> - public void test() throws Exception {
> - final String collection = "collection1";
> - cluster.getSolrClient().setDefaultCollection(collection);
> + public void testLeaderFailsOver() throws Exception {
> CollectionAdminRequest
> .createCollection(collection, "config", 1, 2)
> .process(cluster.getSolrClient());
> cluster.waitForActiveCollection(collection, 1, 2);
> - try {
> - List<String> addedIds = new ArrayList<>();
> - Replica oldLeader = corruptLeader(collection, addedIds);
>
> - waitForState("Timeout waiting for new replica become leader", collection, (liveNodes, collectionState) -> {
> - Slice slice = collectionState.getSlice("shard1");
> + UpdateResponse updateResponse = new UpdateRequest().add("id", "1").commit(cluster.getSolrClient(), null);
> + assertEquals(0, updateResponse.getStatus());
>
> - if (slice.getReplicas().size() != 2) return false;
> - if (slice.getLeader() == null) return false;
> - if (slice.getLeader().getName().equals(oldLeader.getName())) return false;
> + Replica oldLeader = corruptLeader(collection);
>
> - return true;
> - });
> - ClusterStateUtil.waitForAllActiveAndLiveReplicas(cluster.getSolrClient().getZkStateReader(), collection, 120000);
> - Slice shard = getCollectionState(collection).getSlice("shard1");
> - assertNotSame(shard.getLeader().getNodeName(), oldLeader.getNodeName());
> - assertEquals(getNonLeader(shard).getNodeName(), oldLeader.getNodeName());
> + waitForState("Now waiting for new replica to become leader", collection, (liveNodes, collectionState) -> {
> + Slice slice = collectionState.getSlice("shard1");
>
> - for (String id : addedIds) {
> - assertNotNull(cluster.getSolrClient().getById(collection,id));
> - }
> - if (log.isInfoEnabled()) {
> - log.info("The test success oldLeader:{} currentState:{}", oldLeader, getCollectionState(collection));
> - }
> + if (slice.getReplicas().size() != 2) return false;
> + if (slice.getLeader() == null) return false;
> + if (slice.getLeader().getName().equals(oldLeader.getName())) return false;
>
> - } finally {
> - CollectionAdminRequest.deleteCollection(collection).process(cluster.getSolrClient());
> + return true;
> + });
> + ClusterStateUtil.waitForAllActiveAndLiveReplicas(cluster.getSolrClient().getZkStateReader(), collection, 120000);
> + Slice shard = getCollectionState(collection).getSlice("shard1");
> + assertNotEquals("Old leader should not be leader again", oldLeader.getNodeName(), shard.getLeader().getNodeName());
> + assertEquals("Old leader should be a follower", oldLeader.getNodeName(), getNonLeader(shard).getNodeName());
> +
> + // Check that we can continue indexing after this
> + updateResponse = new UpdateRequest().add("id", "2").commit(cluster.getSolrClient(), null);
> + assertEquals(0, updateResponse.getStatus());
> + try (SolrClient followerClient = new HttpSolrClient.Builder(oldLeader.getCoreUrl()).build()) {
> + QueryResponse queryResponse = new QueryRequest(new SolrQuery("*:*")).process(followerClient);
> + assertEquals(queryResponse.getResults().toString(), 2, queryResponse.getResults().getNumFound());
> }
> }
>
> - private Replica corruptLeader(String collection, List<String> addedIds) throws IOException {
> - DocCollection dc = getCollectionState(collection);
> - Replica oldLeader = dc.getLeader("shard1");
> - log.info("Corrupt leader : {}", oldLeader);
> -
> - CoreContainer leaderCC = cluster.getReplicaJetty(oldLeader).getCoreContainer();
> - SolrCore leaderCore = leaderCC.getCores().iterator().next();
> - MockDirectoryWrapper mockDir = (MockDirectoryWrapper) leaderCore.getDirectoryFactory()
> - .get(leaderCore.getIndexDir(), DirectoryFactory.DirContext.DEFAULT, leaderCore.getSolrConfig().indexConfig.lockType);
> - leaderCore.getDirectoryFactory().release(mockDir);
> -
> - try (HttpSolrClient solrClient = new HttpSolrClient.Builder(dc.getLeader("shard1").getCoreUrl()).build()) {
> - for (int i = 0; i < 100; i++) {
> - new UpdateRequest()
> - .add("id", i + "")
> - .process(solrClient);
> - solrClient.commit();
> - addedIds.add(i + "");
> -
> - for (String file : mockDir.listAll()) {
> - if (file.contains("segments_")) continue;
> - if (file.endsWith("si")) continue;
> - if (file.endsWith("fnm")) continue;
> - if (random().nextBoolean()) continue;
> -
> - try {
> - mockDir.corruptFiles(Collections.singleton(file));
> - } catch (RuntimeException | FileNotFoundException | NoSuchFileException e) {
> - // merges can lead to this exception
> - }
> - }
> - }
> - } catch (Exception e) {
> - log.info("Corrupt leader ex: ", e);
> -
> - // solrClient.add/commit would throw RemoteSolrException with error code 500 or
> - // 404(when the leader replica is already deleted by giveupLeadership)
> - if (e instanceof RemoteSolrException) {
> - SolrException se = (SolrException) e;
> - assertThat(se.code(), anyOf(is(500), is(404)));
> - } else if (!(e instanceof AlreadyClosedException)) {
> - throw new RuntimeException("Unexpected exception", e);
> + private Replica corruptLeader(String collection) throws IOException, SolrServerException {
> + try {
> + TestInjection.leaderTragedy = "true:100";
> +
> + DocCollection dc = getCollectionState(collection);
> + Replica oldLeader = dc.getLeader("shard1");
> + log.info("Will crash leader : {}", oldLeader);
> +
> + try (HttpSolrClient solrClient = new HttpSolrClient.Builder(dc.getLeader("shard1").getCoreUrl()).build()) {
> + new UpdateRequest().add("id", "99").commit(solrClient, null);
> + fail("Should have injected tragedy");
> + } catch (RemoteSolrException e) {
> + // solrClient.add would throw RemoteSolrException with code 500
> + // or 404 if the bad replica has already been deleted
> + MatcherAssert.assertThat(e.code(), anyOf(is(500), is(404)));
> + } catch (AlreadyClosedException e) {
> + // If giving up leadership, might be already closed/closing
> }
> - //else expected
> +
> + return oldLeader;
> + } finally {
> + TestInjection.leaderTragedy = null;
> }
> - return oldLeader;
> }
>
> private Replica getNonLeader(Slice slice) {
> @@ -165,8 +140,6 @@ public class LeaderTragicEventTest extends SolrCloudTestCase {
>
> @Test
> public void testOtherReplicasAreNotActive() throws Exception {
> - final String collection = "collection2";
> - cluster.getSolrClient().setDefaultCollection(collection);
> int numReplicas = random().nextInt(2) + 1;
> // won't do anything if leader is the only one active replica in the shard
> CollectionAdminRequest
> @@ -174,7 +147,6 @@ public class LeaderTragicEventTest extends SolrCloudTestCase {
> .process(cluster.getSolrClient());
> cluster.waitForActiveCollection(collection, 1, numReplicas);
>
> - try {
> JettySolrRunner otherReplicaJetty = null;
> if (numReplicas == 2) {
> Slice shard = getCollectionState(collection).getSlice("shard1");
> @@ -187,7 +159,7 @@ public class LeaderTragicEventTest extends SolrCloudTestCase {
> waitForState("Timeout waiting for replica get down", collection, (liveNodes, collectionState) -> getNonLeader(collectionState.getSlice("shard1")).getState() != Replica.State.ACTIVE);
> }
>
> - Replica oldLeader = corruptLeader(collection, new ArrayList<>());
> + Replica oldLeader = corruptLeader(collection);
>
> if (otherReplicaJetty != null) {
> otherReplicaJetty.start();
> @@ -196,9 +168,6 @@ public class LeaderTragicEventTest extends SolrCloudTestCase {
>
> Replica leader = getCollectionState(collection).getSlice("shard1").getLeader();
> assertEquals(leader.getName(), oldLeader.getName());
> - } finally {
> - CollectionAdminRequest.deleteCollection(collection).process(cluster.getSolrClient());
> - }
> }
>
>
> diff --git a/solr/core/src/test/org/apache/solr/cloud/ShardTermsTest.java b/solr/core/src/test/org/apache/solr/cloud/ShardTermsTest.java
> new file mode 100644
> index 0000000..ec20cec
> --- /dev/null
> +++ b/solr/core/src/test/org/apache/solr/cloud/ShardTermsTest.java
> @@ -0,0 +1,48 @@
> +/*
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements. See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License. You may obtain a copy of the License at
> + *
> + * http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +
> +package org.apache.solr.cloud;
> +
> +import org.apache.solr.SolrTestCase;
> +import org.apache.solr.client.solrj.cloud.ShardTerms;
> +import org.junit.Test;
> +
> +import java.util.Collections;
> +import java.util.HashMap;
> +import java.util.Map;
> +
> +public class ShardTermsTest extends SolrTestCase {
> + @Test
> + public void testIncreaseTerms() {
> + Map<String, Long> map = new HashMap<>();
> + map.put("leader", 0L);
> + ShardTerms terms = new ShardTerms(map, 0);
> + terms = terms.increaseTerms("leader", Collections.singleton("replica"));
> + assertEquals(1L, terms.getTerm("leader").longValue());
> +
> + map.put("leader", 2L);
> + map.put("live-replica", 2L);
> + map.put("dead-replica", 1L);
> + terms = new ShardTerms(map, 0);
> + assertNull(terms.increaseTerms("leader", Collections.singleton("dead-replica")));
> +
> + terms = terms.increaseTerms("leader", Collections.singleton("leader"));
> + assertEquals(3L, terms.getTerm("live-replica").longValue());
> + assertEquals(2L, terms.getTerm("leader").longValue());
> + assertEquals(1L, terms.getTerm("dead-replica").longValue());
> + }
> +}
> diff --git a/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java b/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java
> index 56ed8ae7..452c0da 100644
> --- a/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java
> +++ b/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java
> @@ -32,7 +32,6 @@ import java.util.concurrent.atomic.AtomicInteger;
> import java.util.function.Supplier;
>
> import org.apache.solr.client.solrj.SolrServerException;
> -import org.apache.solr.client.solrj.cloud.ShardTerms;
> import org.apache.solr.client.solrj.request.CollectionAdminRequest;
> import org.apache.solr.common.util.TimeSource;
> import org.apache.solr.util.TimeOut;
> @@ -265,13 +264,6 @@ public class ZkShardTermsTest extends SolrCloudTestCase {
> replicaTerms.close();
> }
>
> - public void testEnsureTermsIsHigher() {
> - Map<String, Long> map = new HashMap<>();
> - map.put("leader", 0L);
> - ShardTerms terms = new ShardTerms(map, 0);
> - terms = terms.increaseTerms("leader", Collections.singleton("replica"));
> - assertEquals(1L, terms.getTerm("leader").longValue());
> - }
>
> public void testSetTermToZero() {
> String collection = "setTermToZero";
> diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/ShardTerms.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/ShardTerms.java
> index 3b2f754..cd6ead0 100644
> --- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/ShardTerms.java
> +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/ShardTerms.java
> @@ -73,7 +73,6 @@ public class ShardTerms implements MapWriter {
> */
> public boolean haveHighestTermValue(String coreNodeName) {
> if (values.isEmpty()) return true;
> - long maxTerm = Collections.max(values.values());
> return values.getOrDefault(coreNodeName, 0L) == maxTerm;
> }
>
> @@ -92,7 +91,7 @@ public class ShardTerms implements MapWriter {
> throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Can not find leader's term " + leader);
> }
>
> - boolean changed = false;
> + boolean saveChanges = false;
> boolean foundReplicasInLowerTerms = false;
>
> HashMap<String, Long> newValues = new HashMap<>(values);
> @@ -102,16 +101,16 @@ public class ShardTerms implements MapWriter {
> if (replicasNeedingRecovery.contains(key)) foundReplicasInLowerTerms = true;
> if (Objects.equals(entry.getValue(), leaderTerm)) {
> if(skipIncreaseTermOf(key, replicasNeedingRecovery)) {
> - changed = true;
> + saveChanges = true; // if we don't skip anybody, then there's no reason to increment
> } else {
> - newValues.put(key, leaderTerm+1);
> + entry.setValue(leaderTerm + 1);
> }
> }
> }
>
> // We should skip the optimization if there are no replicasNeedingRecovery present in local terms,
> // this may indicate that the current value is stale
> - if (!changed && foundReplicasInLowerTerms) return null;
> + if (!saveChanges && foundReplicasInLowerTerms) return null;
> return new ShardTerms(newValues, version);
> }
>
> @@ -167,6 +166,12 @@ public class ShardTerms implements MapWriter {
> return new ShardTerms(newValues, version);
> }
>
> + /**
> + * Return a new {@link ShardTerms} in which the associate term of {@code coreNodeName} is equal to zero,
> + * creating it if it does not previously exist.
> + * @param coreNodeName of the replica
> + * @return null if the term of {@code coreNodeName} already exists and is zero
> + */
> public ShardTerms setTermToZero(String coreNodeName) {
> if (values.getOrDefault(coreNodeName, -1L) == 0) {
> return null;
> @@ -182,7 +187,6 @@ public class ShardTerms implements MapWriter {
> * @return null if term of {@code coreNodeName} is already maximum
> */
> public ShardTerms setTermEqualsToLeader(String coreNodeName) {
> - long maxTerm = getMaxTerm();
> if (values.get(coreNodeName) == maxTerm) return null;
>
> HashMap<String, Long> newValues = new HashMap<>(values);
> @@ -201,7 +205,6 @@ public class ShardTerms implements MapWriter {
> * @return null if {@code coreNodeName} is already marked as doing recovering
> */
> public ShardTerms startRecovering(String coreNodeName) {
> - long maxTerm = getMaxTerm();
> if (values.get(coreNodeName) == maxTerm)
> return null;
>
> @@ -246,7 +249,7 @@ public class ShardTerms implements MapWriter {
> return version;
> }
>
> - public Map<String , Long> getTerms() {
> + public Map<String, Long> getTerms() {
> return new HashMap<>(this.values);
> }
>
>
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@lucene.apache.org
For additional commands, e-mail: dev-help@lucene.apache.org
Re: [lucene-solr] branch branch_8x updated: SOLR-15029 Trigger leader
election on index writer tragedy
Posted by Mike Drob <md...@apache.org>.
Thanks for pointing that out, I’ll take care of it today.
On Wed, Dec 16, 2020 at 4:15 AM Robert Muir <rc...@gmail.com> wrote:
> Can we please add proper javadoc and @lucene.internal tag when making
> methods public, especially in classes like IndexWriter like this?
>
> On Tue, Dec 15, 2020 at 5:04 PM <md...@apache.org> wrote:
> >
> > This is an automated email from the ASF dual-hosted git repository.
> >
> > mdrob pushed a commit to branch branch_8x
> > in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
> >
> >
> > The following commit(s) were added to refs/heads/branch_8x by this push:
> > new b090971 SOLR-15029 Trigger leader election on index writer
> tragedy
> > b090971 is described below
> >
> > commit b090971259f57973941d70d13612e22985a09a8d
> > Author: Mike Drob <md...@apple.com>
> > AuthorDate: Fri Dec 4 15:19:49 2020 -0800
> >
> > SOLR-15029 Trigger leader election on index writer tragedy
> >
> > SOLR-13027 Use TestInjection so that we always have a Tragic Event
> >
> > When we encounter a tragic error in the index writer, we can trigger
> a
> > leader election instead of queing up a delete and re-add of the node
> in
> > question. This should result in a more graceful transition, and the
> > previous leader will eventually be put into recovery by a new leader.
> >
> > Backport removes additional logging from ShardTerms.save because we
> do
> > not have StackWalker in Java 8.
> > ---
> > .../java/org/apache/lucene/index/IndexWriter.java | 7 +-
> > .../java/org/apache/solr/cloud/LeaderElector.java | 11 +-
> > .../org/apache/solr/cloud/RecoveryStrategy.java | 157
> +++++++-----------
> > .../solr/cloud/ShardLeaderElectionContext.java | 8 +-
> > .../java/org/apache/solr/cloud/ZkController.java | 83 ++++++----
> > .../java/org/apache/solr/cloud/ZkShardTerms.java | 76 ++++-----
> > .../java/org/apache/solr/core/CoreContainer.java | 10 +-
> > .../apache/solr/handler/RequestHandlerBase.java | 2 +
> > .../solr/handler/admin/CollectionsHandler.java | 1 +
> > .../solr/handler/admin/RebalanceLeaders.java | 2 +-
> > .../org/apache/solr/servlet/ResponseUtils.java | 2 +-
> > .../java/org/apache/solr/util/TestInjection.java | 39 +++++
> > .../apache/solr/cloud/LeaderTragicEventTest.java | 177
> +++++++++------------
> > .../test/org/apache/solr/cloud/ShardTermsTest.java | 48 ++++++
> > .../org/apache/solr/cloud/ZkShardTermsTest.java | 8 -
> > .../apache/solr/client/solrj/cloud/ShardTerms.java | 19 ++-
> > 16 files changed, 338 insertions(+), 312 deletions(-)
> >
> > diff --git
> a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
> b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
> > index 06ab80e..5e3182a 100644
> > --- a/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
> > +++ b/lucene/core/src/java/org/apache/lucene/index/IndexWriter.java
> > @@ -5172,11 +5172,12 @@ public class IndexWriter implements Closeable,
> TwoPhaseCommit, Accountable,
> > }
> >
> > /**
> > - * This method should be called on a tragic event ie. if a downstream
> class of the writer
> > + * <p>This method should be called on a tragic event ie. if a
> downstream class of the writer
> > * hits an unrecoverable exception. This method does not rethrow the
> tragic event exception.
> > - * Note: This method will not close the writer but can be called from
> any location without respecting any lock order
> > + * <p>Note: This method will not close the writer but can be called
> from any location without respecting any lock order
> > + * <p>This method is visible for testing, and is not expected to be
> called by client code
> > */
> > - private void onTragicEvent(Throwable tragedy, String location) {
> > + public void onTragicEvent(Throwable tragedy, String location) {
> > // This is not supposed to be tragic: IW is supposed to catch this
> and
> > // ignore, because it means we asked the merge to abort:
> > assert tragedy instanceof MergePolicy.MergeAbortedException ==
> false;
> > diff --git a/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
> b/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
> > index f50aa11..e55ce2b 100644
> > --- a/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
> > +++ b/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
> > @@ -18,10 +18,11 @@ package org.apache.solr.cloud;
> >
> > import java.io.IOException;
> > import java.lang.invoke.MethodHandles;
> > -import java.util.Collections;
> > +import java.util.Comparator;
> > import java.util.Iterator;
> > import java.util.List;
> > import java.util.Map;
> > +import java.util.function.Function;
> > import java.util.regex.Matcher;
> > import java.util.regex.Pattern;
> >
> > @@ -42,7 +43,7 @@ import org.slf4j.LoggerFactory;
> >
> > /**
> > * Leader Election process. This class contains the logic by which a
> > - * leader is chosen. First call * {@link #setup(ElectionContext)} to
> ensure
> > + * leader is chosen. First call {@link #setup(ElectionContext)} to
> ensure
> > * the election process is init'd. Next call
> > * {@link #joinElection(ElectionContext, boolean)} to start the leader
> election.
> > *
> > @@ -166,7 +167,6 @@ public class LeaderElector {
> > }
> > }
> >
> > - // TODO: get this core param out of here
> > protected void runIamLeaderProcess(final ElectionContext context,
> boolean weAreReplacement) throws KeeperException,
> > InterruptedException, IOException {
> > context.runLeaderProcess(weAreReplacement,0);
> > @@ -378,10 +378,7 @@ public class LeaderElector {
> > * Sort n string sequence list.
> > */
> > public static void sortSeqs(List<String> seqs) {
> > - Collections.sort(seqs, (o1, o2) -> {
> > - int i = getSeq(o1) - getSeq(o2);
> > - return i == 0 ? o1.compareTo(o2) : i;
> > - });
> > +
> seqs.sort(Comparator.comparingInt(LeaderElector::getSeq).thenComparing(Function.identity()));
> > }
> >
> > void retryElection(ElectionContext context, boolean joinAtHead)
> throws KeeperException, InterruptedException, IOException {
> > diff --git
> a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
> b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
> > index f70276c..1366970 100644
> > --- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
> > +++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
> > @@ -197,9 +197,8 @@ public class RecoveryStrategy implements Runnable,
> Closeable {
> > log.warn("Stopping recovery for core=[{}] coreNodeName=[{}]",
> coreName, coreZkNodeName);
> > }
> >
> > - final private void recoveryFailed(final SolrCore core,
> > - final ZkController zkController, final String baseUrl,
> > - final String shardZkNodeName, final CoreDescriptor cd) throws
> Exception {
> > + final private void recoveryFailed(final ZkController zkController,
> > + final CoreDescriptor cd) throws
> Exception {
> > SolrException.log(log, "Recovery failed - I give up.");
> > try {
> > zkController.publish(cd, Replica.State.RECOVERY_FAILED);
> > @@ -423,63 +422,64 @@ public class RecoveryStrategy implements Runnable,
> Closeable {
> > }
> >
> > if (!successfulRecovery) {
> > - // lets pause for a moment and we need to try again...
> > - // TODO: we don't want to retry for some problems?
> > - // Or do a fall off retry...
> > - try {
> > + if (waitBetweenRecoveries(core.getName())) break;
> > + }
> > + }
> > + // We skip core.seedVersionBuckets(); We don't have a transaction
> log
> > + log.info("Finished recovery process, successful=[{}]",
> successfulRecovery);
> > + }
> >
> > - if (isClosed()) {
> > - if (log.isInfoEnabled()) {
> > - log.info("Recovery for core {} has been closed",
> core.getName());
> > - }
> > - break;
> > - }
> > + /**
> > + * @return true if we have reached max attempts or should stop
> recovering for some other reason
> > + */
> > + private boolean waitBetweenRecoveries(String coreName) {
> > + // lets pause for a moment and we need to try again...
> > + // TODO: we don't want to retry for some problems?
> > + // Or do a fall off retry...
> > + try {
> > + if (isClosed()) {
> > + log.info("Recovery for core {} has been closed", coreName);
> > + return true;
> > + }
> >
> > - log.error("Recovery failed - trying again... ({})", retries);
> > + log.error("Recovery failed - trying again... ({})", retries);
> >
> > - retries++;
> > - if (retries >= maxRetries) {
> > - SolrException.log(log, "Recovery failed - max retries
> exceeded (" + retries + ").");
> > - try {
> > - recoveryFailed(core, zkController, baseUrl,
> coreZkNodeName, this.coreDescriptor);
> > - } catch (Exception e) {
> > - SolrException.log(log, "Could not publish that recovery
> failed", e);
> > - }
> > - break;
> > - }
> > + retries++;
> > + if (retries >= maxRetries) {
> > + SolrException.log(log, "Recovery failed - max retries exceeded
> (" + retries + ").");
> > + try {
> > + recoveryFailed(zkController, this.coreDescriptor);
> > } catch (Exception e) {
> > - SolrException.log(log, "An error has occurred during
> recovery", e);
> > + SolrException.log(log, "Could not publish that recovery
> failed", e);
> > }
> > + return true;
> > + }
> > + } catch (Exception e) {
> > + SolrException.log(log, "An error has occurred during recovery",
> e);
> > + }
> >
> > - try {
> > - // Wait an exponential interval between retries, start at 5
> seconds and work up to a minute.
> > - // If we're at attempt >= 4, there's no point computing
> pow(2, retries) because the result
> > - // will always be the minimum of the two (12). Since we sleep
> at 5 seconds sub-intervals in
> > - // order to check if we were closed, 12 is chosen as the
> maximum loopCount (5s * 12 = 1m).
> > - int loopCount = retries < 4 ? (int) Math.min(Math.pow(2,
> retries), 12) : 12;
> > - if (log.isInfoEnabled()) {
> > - log.info("Wait [{}] seconds before trying to recover again
> (attempt={})",
> > + try {
> > + // Wait an exponential interval between retries, start at 4
> seconds and work up to a minute.
> > + // Meanwhile we will check in 2s sub-intervals to see if we've
> been closed
> > + // Maximum loop count is 30 because we never want to wait longer
> than a minute (2s * 30 = 1m)
> > + int loopCount = retries < 5 ? (int) Math.pow(2, retries) : 30;
> > + if (log.isInfoEnabled()) {
> > + log.info("Wait [{}] seconds before trying to recover again
> (attempt={})",
> > TimeUnit.MILLISECONDS.toSeconds(loopCount *
> startingRecoveryDelayMilliSeconds), retries);
> > - }
> > - for (int i = 0; i < loopCount; i++) {
> > - if (isClosed()) {
> > - if (log.isInfoEnabled()) {
> > - log.info("Recovery for core {} has been closed",
> core.getName());
> > - }
> > - break; // check if someone closed us
> > - }
> > - Thread.sleep(startingRecoveryDelayMilliSeconds);
> > - }
> > - } catch (InterruptedException e) {
> > - Thread.currentThread().interrupt();
> > - log.warn("Recovery was interrupted.", e);
> > - close = true;
> > + }
> > + for (int i = 0; i < loopCount; i++) {
> > + if (isClosed()) {
> > + log.info("Recovery for core {} has been closed", coreName);
> > + break; // check if someone closed us
> > }
> > + Thread.sleep(startingRecoveryDelayMilliSeconds);
> > }
> > -
> > + } catch (InterruptedException e) {
> > + Thread.currentThread().interrupt();
> > + log.warn("Recovery was interrupted.", e);
> > + close = true;
> > }
> > - // We skip core.seedVersionBuckets(); We don't have a transaction
> log
> > - log.info("Finished recovery process, successful=[{}]",
> successfulRecovery);
> > + return false;
> > }
> >
> > // TODO: perhaps make this grab a new core each time through the loop
> to handle core reloads?
> > @@ -490,8 +490,8 @@ public class RecoveryStrategy implements Runnable,
> Closeable {
> > ulog = core.getUpdateHandler().getUpdateLog();
> > if (ulog == null) {
> > SolrException.log(log, "No UpdateLog found - cannot recover.");
> > - recoveryFailed(core, zkController, baseUrl, coreZkNodeName,
> > - this.coreDescriptor);
> > + recoveryFailed(zkController,
> > + this.coreDescriptor);
> > return;
> > }
> >
> > @@ -709,7 +709,7 @@ public class RecoveryStrategy implements Runnable,
> Closeable {
> > }
> > zkController.publish(this.coreDescriptor,
> Replica.State.ACTIVE);
> > } catch (Exception e) {
> > - log.error("Could not publish as ACTIVE after succesful
> recovery", e);
> > + log.error("Could not publish as ACTIVE after successful
> recovery", e);
> > successfulRecovery = false;
> > }
> >
> > @@ -721,53 +721,8 @@ public class RecoveryStrategy implements Runnable,
> Closeable {
> > }
> >
> > if (!successfulRecovery) {
> > - // lets pause for a moment and we need to try again...
> > - // TODO: we don't want to retry for some problems?
> > - // Or do a fall off retry...
> > - try {
> > -
> > - if (isClosed()) {
> > - log.info("RecoveryStrategy has been closed");
> > - break;
> > - }
> > -
> > - log.error("Recovery failed - trying again... ({})", retries);
> > -
> > - retries++;
> > - if (retries >= maxRetries) {
> > - SolrException.log(log, "Recovery failed - max retries
> exceeded (" + retries + ").");
> > - try {
> > - recoveryFailed(core, zkController, baseUrl,
> coreZkNodeName, this.coreDescriptor);
> > - } catch (Exception e) {
> > - SolrException.log(log, "Could not publish that recovery
> failed", e);
> > - }
> > - break;
> > - }
> > - } catch (Exception e) {
> > - SolrException.log(log, "An error has occurred during
> recovery", e);
> > - }
> > -
> > - try {
> > - // Wait an exponential interval between retries, start at 2
> seconds and work up to a minute.
> > - // Since we sleep at 2 seconds sub-intervals in
> > - // order to check if we were closed, 30 is chosen as the
> maximum loopCount (2s * 30 = 1m).
> > - double loopCount = Math.min(Math.pow(2, retries - 1), 30);
> > - log.info("Wait [{}] seconds before trying to recover again
> (attempt={})",
> > - loopCount * startingRecoveryDelayMilliSeconds, retries);
> > - for (int i = 0; i < loopCount; i++) {
> > - if (isClosed()) {
> > - log.info("RecoveryStrategy has been closed");
> > - break; // check if someone closed us
> > - }
> > - Thread.sleep(startingRecoveryDelayMilliSeconds);
> > - }
> > - } catch (InterruptedException e) {
> > - Thread.currentThread().interrupt();
> > - log.warn("Recovery was interrupted.", e);
> > - close = true;
> > - }
> > + if (waitBetweenRecoveries(core.getName())) break;
> > }
> > -
> > }
> >
> > // if replay was skipped (possibly to due pulling a full index from
> the leader),
> > @@ -780,6 +735,10 @@ public class RecoveryStrategy implements Runnable,
> Closeable {
> > log.info("Finished recovery process, successful=[{}]",
> successfulRecovery);
> > }
> >
> > + /**
> > + * Make sure we can connect to the shard leader as currently defined
> in ZK
> > + * @param ourUrl if the leader url is the same as our url, we will
> skip trying to connect
> > + */
> > private final Replica pingLeader(String ourUrl, CoreDescriptor
> coreDesc, boolean mayPutReplicaAsDown)
> > throws Exception {
> > int numTried = 0;
> > diff --git
> a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java
> b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java
> > index 69ccf75..68b062e 100644
> > ---
> a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java
> > +++
> b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java
> > @@ -46,7 +46,6 @@ import
> org.apache.zookeeper.KeeperException.SessionExpiredException;
> > import org.slf4j.Logger;
> > import org.slf4j.LoggerFactory;
> >
> > -// add core container and stop passing core around...
> > final class ShardLeaderElectionContext extends
> ShardLeaderElectionContextBase {
> > private static final Logger log =
> LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
> >
> > @@ -120,11 +119,10 @@ final class ShardLeaderElectionContext extends
> ShardLeaderElectionContextBase {
> >
> zkController.getOverseer().getStateUpdateQueue().offer(Utils.toJSON(m));
> > }
> >
> > - boolean allReplicasInLine = false;
> > if (!weAreReplacement) {
> > - allReplicasInLine = waitForReplicasToComeUp(leaderVoteWait);
> > + waitForReplicasToComeUp(leaderVoteWait);
> > } else {
> > - allReplicasInLine = areAllReplicasParticipating();
> > + areAllReplicasParticipating();
> > }
> >
> > if (isClosed) {
> > @@ -237,7 +235,6 @@ final class ShardLeaderElectionContext extends
> ShardLeaderElectionContextBase {
> >
> > }
> >
> > - boolean isLeader = true;
> > if (!isClosed) {
> > try {
> > if (replicaType == Replica.Type.TLOG) {
> > @@ -281,7 +278,6 @@ final class ShardLeaderElectionContext extends
> ShardLeaderElectionContextBase {
> > throw new SolrException(ErrorCode.SERVER_ERROR,
> > "ZK session expired - cancelling election for " +
> collection + " " + shardId);
> > } catch (Exception e) {
> > - isLeader = false;
> > SolrException.log(log, "There was a problem trying to
> register as the leader", e);
> >
> > try (SolrCore core = cc.getCore(coreName)) {
> > diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
> b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
> > index d71adb1..6cc2afe 100644
> > --- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
> > +++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
> > @@ -64,11 +64,33 @@ import org.apache.solr.cloud.overseer.SliceMutator;
> > import org.apache.solr.common.AlreadyClosedException;
> > import org.apache.solr.common.SolrException;
> > import org.apache.solr.common.SolrException.ErrorCode;
> > -import org.apache.solr.common.cloud.*;
> > +import org.apache.solr.common.cloud.BeforeReconnect;
> > +import org.apache.solr.common.cloud.ClusterState;
> > +import org.apache.solr.common.cloud.ConnectionManager;
> > +import org.apache.solr.common.cloud.DefaultConnectionStrategy;
> > +import org.apache.solr.common.cloud.DefaultZkACLProvider;
> > +import org.apache.solr.common.cloud.DefaultZkCredentialsProvider;
> > +import org.apache.solr.common.cloud.DocCollection;
> > +import org.apache.solr.common.cloud.DocCollectionWatcher;
> > +import org.apache.solr.common.cloud.LiveNodesListener;
> > +import org.apache.solr.common.cloud.NodesSysPropsCacher;
> > +import org.apache.solr.common.cloud.OnReconnect;
> > +import org.apache.solr.common.cloud.Replica;
> > import org.apache.solr.common.cloud.Replica.Type;
> > +import org.apache.solr.common.cloud.Slice;
> > +import org.apache.solr.common.cloud.SolrZkClient;
> > +import org.apache.solr.common.cloud.UrlScheme;
> > +import org.apache.solr.common.cloud.ZkACLProvider;
> > +import org.apache.solr.common.cloud.ZkCmdExecutor;
> > +import org.apache.solr.common.cloud.ZkConfigManager;
> > +import org.apache.solr.common.cloud.ZkCoreNodeProps;
> > +import org.apache.solr.common.cloud.ZkCredentialsProvider;
> > +import org.apache.solr.common.cloud.ZkMaintenanceUtils;
> > +import org.apache.solr.common.cloud.ZkNodeProps;
> > +import org.apache.solr.common.cloud.ZkStateReader;
> > +import org.apache.solr.common.cloud.ZooKeeperException;
> > import org.apache.solr.common.params.CollectionParams;
> > import org.apache.solr.common.params.CommonParams;
> > -import org.apache.solr.common.params.CoreAdminParams;
> > import org.apache.solr.common.params.SolrParams;
> > import org.apache.solr.common.util.ExecutorUtil;
> > import org.apache.solr.common.util.IOUtils;
> > @@ -109,7 +131,6 @@ import static
> org.apache.solr.common.cloud.ZkStateReader.CORE_NODE_NAME_PROP;
> > import static
> org.apache.solr.common.cloud.ZkStateReader.ELECTION_NODE_PROP;
> > import static org.apache.solr.common.cloud.ZkStateReader.NODE_NAME_PROP;
> > import static
> org.apache.solr.common.cloud.ZkStateReader.REJOIN_AT_HEAD_PROP;
> > -import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
> > import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
> >
> > /**
> > @@ -174,6 +195,11 @@ public class ZkController implements Closeable {
> > } else if (!coreNodeName.equals(other.coreNodeName)) return false;
> > return true;
> > }
> > +
> > + @Override
> > + public String toString() {
> > + return collection + ':' + coreNodeName;
> > + }
> > }
> >
> > private final Map<ContextKey, ElectionContext> electionContexts =
> Collections.synchronizedMap(new HashMap<>());
> > @@ -652,55 +678,46 @@ public class ZkController implements Closeable {
> > /**
> > * Best effort to give up the leadership of a shard in a core after
> hitting a tragic exception
> > * @param cd The current core descriptor
> > - * @param tragicException The tragic exception from the {@code
> IndexWriter}
> > */
> > - public void giveupLeadership(CoreDescriptor cd, Throwable
> tragicException) {
> > - assert tragicException != null;
> > + public void giveupLeadership(CoreDescriptor cd) {
> > assert cd != null;
> > - DocCollection dc =
> getClusterState().getCollectionOrNull(cd.getCollectionName());
> > +
> > + String collection = cd.getCollectionName();
> > + if (collection == null) return;
> > +
> > + DocCollection dc =
> getClusterState().getCollectionOrNull(collection);
> > if (dc == null) return;
> >
> > Slice shard = dc.getSlice(cd.getCloudDescriptor().getShardId());
> > if (shard == null) return;
> >
> > // if this replica is not a leader, it will be put in recovery
> state by the leader
> > - if (shard.getReplica(cd.getCloudDescriptor().getCoreNodeName()) !=
> shard.getLeader()) return;
> > + String leader = cd.getCloudDescriptor().getCoreNodeName();
> > + if (shard.getReplica(leader) != shard.getLeader()) return;
> >
> > + Set<String> liveNodes = getClusterState().getLiveNodes();
> > int numActiveReplicas = shard.getReplicas(
> > rep -> rep.getState() == Replica.State.ACTIVE
> > && rep.getType() != Type.PULL
> > - &&
> getClusterState().getLiveNodes().contains(rep.getNodeName())
> > + && liveNodes.contains(rep.getNodeName())
> > ).size();
> >
> > // at least the leader still be able to search, we should give up
> leadership if other replicas can take over
> > if (numActiveReplicas >= 2) {
> > - String key = cd.getCollectionName() + ":" +
> cd.getCloudDescriptor().getCoreNodeName();
> > - //TODO better handling the case when delete replica was failed
> > - if (replicasMetTragicEvent.putIfAbsent(key, tragicException) ==
> null) {
> > - log.warn("Leader {} met tragic exception, give up its
> leadership", key, tragicException);
> > + ContextKey key = new ContextKey(collection, leader);
> > + ElectionContext context = electionContexts.get(key);
> > + if (context instanceof ShardLeaderElectionContextBase) {
> > + LeaderElector elector = ((ShardLeaderElectionContextBase)
> context).getLeaderElector();
> > try {
> > - // by using Overseer to remove and add replica back, we can
> do the task in an async/robust manner
> > - Map<String,Object> props = new HashMap<>();
> > - props.put(Overseer.QUEUE_OPERATION, "deletereplica");
> > - props.put(COLLECTION_PROP, cd.getCollectionName());
> > - props.put(SHARD_ID_PROP, shard.getName());
> > - props.put(REPLICA_PROP,
> cd.getCloudDescriptor().getCoreNodeName());
> > - getOverseerCollectionQueue().offer(Utils.toJSON(new
> ZkNodeProps(props)));
> > -
> > - props.clear();
> > - props.put(Overseer.QUEUE_OPERATION, "addreplica");
> > - props.put(COLLECTION_PROP, cd.getCollectionName());
> > - props.put(SHARD_ID_PROP, shard.getName());
> > - props.put(ZkStateReader.REPLICA_TYPE,
> cd.getCloudDescriptor().getReplicaType().name().toUpperCase(Locale.ROOT));
> > - props.put(CoreAdminParams.NODE, getNodeName());
> > - getOverseerCollectionQueue().offer(Utils.toJSON(new
> ZkNodeProps(props)));
> > - } catch (Exception e) {
> > - // Exceptions are not bubbled up. giveupLeadership is best
> effort, and is only called in case of some other
> > - // unrecoverable error happened
> > - log.error("Met exception on give up leadership for {}", key,
> e);
> > - replicasMetTragicEvent.remove(key);
> > + log.warn("Leader {} met tragic exception, give up its
> leadership", key);
> > + elector.retryElection(context, false);
> > + } catch (KeeperException | InterruptedException | IOException
> e) {
> > SolrZkClient.checkInterrupted(e);
> > + log.error("Met exception on give up leadership for {}", key,
> e);
> > }
> > + } else {
> > + // The node is probably already gone
> > + log.warn("Could not get election context {} to give up
> leadership", key);
> > }
> > }
> > }
> > diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
> b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
> > index cc33205..6c38797 100644
> > --- a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
> > +++ b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
> > @@ -25,6 +25,7 @@ import java.util.Set;
> > import java.util.concurrent.TimeoutException;
> > import java.util.concurrent.atomic.AtomicBoolean;
> > import java.util.concurrent.atomic.AtomicReference;
> > +import java.util.function.Function;
> >
> > import org.apache.solr.client.solrj.cloud.ShardTerms;
> > import org.apache.solr.common.SolrException;
> > @@ -73,7 +74,7 @@ public class ZkShardTerms implements AutoCloseable{
> > private final Set<CoreTermWatcher> listeners = new HashSet<>();
> > private final AtomicBoolean isClosed = new AtomicBoolean(false);
> >
> > - private AtomicReference<ShardTerms> terms = new AtomicReference<>();
> > + private final AtomicReference<ShardTerms> terms = new
> AtomicReference<>();
> >
> > /**
> > * Listener of a core for shard's term change events
> > @@ -106,17 +107,15 @@ public class ZkShardTerms implements AutoCloseable{
> > }
> >
> > /**
> > - * Ensure that leader's term is higher than some replica's terms
> > + * Ensure that terms are higher than some replica's terms. If the
> current leader is attempting to give up
> > + * leadership and included in replicasNeedingRecovery, then other
> replicas that are in sync will have higher
> > + * terms, while the leader will stay where it is.
> > * @param leader coreNodeName of leader
> > * @param replicasNeedingRecovery set of replicas in which their
> terms should be lower than leader's term
> > */
> > public void ensureTermsIsHigher(String leader, Set<String>
> replicasNeedingRecovery) {
> > if (replicasNeedingRecovery.isEmpty()) return;
> > -
> > - ShardTerms newTerms;
> > - while( (newTerms = terms.get().increaseTerms(leader,
> replicasNeedingRecovery)) != null) {
> > - if (forceSaveTerms(newTerms)) return;
> > - }
> > + mutate(terms -> terms.increaseTerms(leader,
> replicasNeedingRecovery));
> > }
> >
> > public ShardTerms getShardTerms() {
> > @@ -206,10 +205,7 @@ public class ZkShardTerms implements AutoCloseable{
> > * @param coreNodeName of the replica
> > */
> > void registerTerm(String coreNodeName) {
> > - ShardTerms newTerms;
> > - while ( (newTerms = terms.get().registerTerm(coreNodeName)) !=
> null) {
> > - if (forceSaveTerms(newTerms)) break;
> > - }
> > + mutate(terms -> terms.registerTerm(coreNodeName));
> > }
> >
> > /**
> > @@ -218,37 +214,29 @@ public class ZkShardTerms implements AutoCloseable{
> > * @param coreNodeName of the replica
> > */
> > public void setTermEqualsToLeader(String coreNodeName) {
> > - ShardTerms newTerms;
> > - while ( (newTerms =
> terms.get().setTermEqualsToLeader(coreNodeName)) != null) {
> > - if (forceSaveTerms(newTerms)) break;
> > - }
> > + mutate(terms -> terms.setTermEqualsToLeader(coreNodeName));
> > }
> >
> > + /**
> > + * Set a replica's term to 0. If the term does not exist, create it.
> > + * @param coreNodeName of the replica
> > + */
> > public void setTermToZero(String coreNodeName) {
> > - ShardTerms newTerms;
> > - while ( (newTerms = terms.get().setTermToZero(coreNodeName)) !=
> null) {
> > - if (forceSaveTerms(newTerms)) break;
> > - }
> > + mutate(terms -> terms.setTermToZero(coreNodeName));
> > }
> >
> > /**
> > * Mark {@code coreNodeName} as recovering
> > */
> > public void startRecovering(String coreNodeName) {
> > - ShardTerms newTerms;
> > - while ( (newTerms = terms.get().startRecovering(coreNodeName)) !=
> null) {
> > - if (forceSaveTerms(newTerms)) break;
> > - }
> > + mutate(terms -> terms.startRecovering(coreNodeName));
> > }
> >
> > /**
> > * Mark {@code coreNodeName} as finished recovering
> > */
> > public void doneRecovering(String coreNodeName) {
> > - ShardTerms newTerms;
> > - while ( (newTerms = terms.get().doneRecovering(coreNodeName)) !=
> null) {
> > - if (forceSaveTerms(newTerms)) break;
> > - }
> > + mutate(terms -> terms.doneRecovering(coreNodeName));
> > }
> >
> > public boolean isRecovering(String name) {
> > @@ -260,8 +248,17 @@ public class ZkShardTerms implements AutoCloseable{
> > * so we must switch from term 0 (registered) to 1 (have some data)
> > */
> > public void ensureHighestTermsAreNotZero() {
> > + mutate(ShardTerms::ensureHighestTermsAreNotZero);
> > + }
> > +
> > + /**
> > + * Attempt to apply an action and save the results, retrying as
> necessary.
> > + * If action returns null, then we are done and will not make
> additional retires.
> > + * @param action The mutation to apply to current shard terms before
> saving
> > + */
> > + private void mutate(Function<ShardTerms, ShardTerms> action) {
> > ShardTerms newTerms;
> > - while ( (newTerms = terms.get().ensureHighestTermsAreNotZero()) !=
> null) {
> > + while ((newTerms = action.apply(terms.get())) != null) {
> > if (forceSaveTerms(newTerms)) break;
> > }
> > }
> > @@ -315,7 +312,8 @@ public class ZkShardTerms implements AutoCloseable{
> > refreshTerms();
> > } catch (KeeperException.NoNodeException e) {
> > throw e;
> > - } catch (Exception e) {
> > + } catch (RuntimeException | KeeperException | InterruptedException
> e) {
> > + SolrZkClient.checkInterrupted(e);
> > throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
> "Error while saving shard term for collection: " + collection, e);
> > }
> > return false;
> > @@ -336,13 +334,10 @@ public class ZkShardTerms implements AutoCloseable{
> > // it's okay if another beats us creating the node
> > }
> >
> > - } catch (InterruptedException e) {
> > - Thread.interrupted();
> > + } catch (KeeperException | InterruptedException e) {
> > + Throwable cause = SolrZkClient.checkInterrupted(e);
> > throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
> > - "Error creating shard term node in Zookeeper for collection:
> " + collection, e);
> > - } catch (KeeperException e) {
> > - throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
> > - "Error creating shard term node in Zookeeper for collection:
> " + collection, e);
> > + "Error creating shard term node in Zookeeper for collection:
> " + collection, cause);
> > }
> > }
> >
> > @@ -356,11 +351,10 @@ public class ZkShardTerms implements AutoCloseable{
> > Stat stat = new Stat();
> > byte[] data = zkClient.getData(znodePath, null, stat, true);
> > newTerms = new ShardTerms((Map<String, Long>)
> Utils.fromJSON(data), stat.getVersion());
> > - } catch (KeeperException e) {
> > - Thread.interrupted();
> > - throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
> "Error updating shard term for collection: " + collection, e);
> > - } catch (InterruptedException e) {
> > - throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
> "Error updating shard term for collection: " + collection, e);
> > + } catch (KeeperException | InterruptedException e) {
> > + Throwable cause = SolrZkClient.checkInterrupted(e);
> > + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
> > + "Error updating shard term for collection: " + collection,
> cause);
> > }
> >
> > setNewTerms(newTerms);
> > @@ -408,7 +402,7 @@ public class ZkShardTerms implements AutoCloseable{
> > // exists operation is faster than getData operation
> > zkClient.exists(znodePath, watcher, true);
> > } catch (InterruptedException e) {
> > - Thread.interrupted();
> > + Thread.currentThread().interrupt();
> > throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
> "Error watching shard term for collection: " + collection, e);
> > }
> > }
> > diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
> b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
> > index c3fb644..6022f83 100644
> > --- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
> > +++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
> > @@ -2207,7 +2207,15 @@ public class CoreContainer {
> > }
> >
> > if (tragicException != null && isZooKeeperAware()) {
> > - getZkController().giveupLeadership(solrCore.getCoreDescriptor(),
> tragicException);
> > + getZkController().giveupLeadership(solrCore.getCoreDescriptor());
> > +
> > + try {
> > + // If the error was something like a full file system
> disconnect, this probably won't help
> > + // But if it is a transient disk failure then it's worth a try
> > + solrCore.getSolrCoreState().newIndexWriter(solrCore, false); //
> should we rollback?
> > + } catch (IOException e) {
> > + log.warn("Could not roll index writer after tragedy");
> > + }
> > }
> >
> > return tragicException != null;
> > diff --git
> a/solr/core/src/java/org/apache/solr/handler/RequestHandlerBase.java
> b/solr/core/src/java/org/apache/solr/handler/RequestHandlerBase.java
> > index c2dbd0e..080a696 100644
> > --- a/solr/core/src/java/org/apache/solr/handler/RequestHandlerBase.java
> > +++ b/solr/core/src/java/org/apache/solr/handler/RequestHandlerBase.java
> > @@ -46,6 +46,7 @@ import org.apache.solr.request.SolrRequestHandler;
> > import org.apache.solr.response.SolrQueryResponse;
> > import org.apache.solr.search.SyntaxError;
> > import org.apache.solr.util.SolrPluginUtils;
> > +import org.apache.solr.util.TestInjection;
> > import org.slf4j.Logger;
> > import org.slf4j.LoggerFactory;
> >
> > @@ -206,6 +207,7 @@ public abstract class RequestHandlerBase implements
> SolrRequestHandler, SolrInfo
> > @SuppressWarnings("resource")
> > Timer.Context dTimer = distrib ? distribRequestTimes.time() :
> localRequestTimes.time();
> > try {
> > + TestInjection.injectLeaderTragedy(req.getCore());
> > if (pluginInfo != null &&
> pluginInfo.attributes.containsKey(USEPARAM))
> > req.getContext().put(USEPARAM,
> pluginInfo.attributes.get(USEPARAM));
> > SolrPluginUtils.setDefaults(this, req, defaults, appends,
> invariants);
> > diff --git
> a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
> b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
> > index f4fb7e0..0832126 100644
> > ---
> a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
> > +++
> b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
> > @@ -1370,6 +1370,7 @@ public class CollectionsHandler extends
> RequestHandlerBase implements Permission
> > //TODO only increase terms of replicas less out-of-sync
> > liveReplicas.stream()
> > .filter(rep -> zkShardTerms.registered(rep.getName()))
> > + // TODO should this all be done at once instead of
> increasing each replica individually?
> > .forEach(rep ->
> zkShardTerms.setTermEqualsToLeader(rep.getName()));
> > }
> >
> > diff --git
> a/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java
> b/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java
> > index 8f3fdb2..239fbec 100644
> > ---
> a/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java
> > +++
> b/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java
> > @@ -318,7 +318,7 @@ class RebalanceLeaders {
> >
> > // Put the replica in at the head of the queue and send all nodes
> with the same sequence number to the back of the list
> > // There can be "ties", i.e. replicas in the queue with the same
> sequence number. Sorting doesn't necessarily sort
> > - // the one we most care about first. So put the node we _don't care
> about at the end of the election queuel
> > + // the one we most care about first. So put the node we _don't care
> about at the end of the election queue_
> >
> > void makeReplicaFirstWatcher(Slice slice, Replica replica)
> > throws KeeperException, InterruptedException {
> > diff --git
> a/solr/core/src/java/org/apache/solr/servlet/ResponseUtils.java
> b/solr/core/src/java/org/apache/solr/servlet/ResponseUtils.java
> > index 8a5f2eb..b7df6fe 100644
> > --- a/solr/core/src/java/org/apache/solr/servlet/ResponseUtils.java
> > +++ b/solr/core/src/java/org/apache/solr/servlet/ResponseUtils.java
> > @@ -69,7 +69,7 @@ public class ResponseUtils {
> > if (code == 500 || code < 100) {
> > StringWriter sw = new StringWriter();
> > ex.printStackTrace(new PrintWriter(sw));
> > - SolrException.log(log, null, ex);
> > + SolrException.log(log, ex);
> > info.add("trace", sw.toString());
> >
> > // non standard codes have undefined results with various servers
> > diff --git a/solr/core/src/java/org/apache/solr/util/TestInjection.java
> b/solr/core/src/java/org/apache/solr/util/TestInjection.java
> > index 3ae2349..edee292 100644
> > --- a/solr/core/src/java/org/apache/solr/util/TestInjection.java
> > +++ b/solr/core/src/java/org/apache/solr/util/TestInjection.java
> > @@ -16,6 +16,7 @@
> > */
> > package org.apache.solr.util;
> >
> > +import java.io.IOException;
> > import java.lang.invoke.MethodHandles;
> > import java.lang.reflect.Method;
> > import java.util.Collections;
> > @@ -31,11 +32,13 @@ import java.util.concurrent.atomic.AtomicInteger;
> > import java.util.regex.Matcher;
> > import java.util.regex.Pattern;
> >
> > +import org.apache.lucene.index.IndexWriter;
> > import org.apache.solr.common.NonExistentCoreException;
> > import org.apache.solr.common.SolrException;
> > import org.apache.solr.common.SolrException.ErrorCode;
> > import org.apache.solr.common.util.Pair;
> > import org.apache.solr.core.CoreContainer;
> > +import org.apache.solr.core.SolrCore;
> > import org.slf4j.Logger;
> > import org.slf4j.LoggerFactory;
> >
> > @@ -111,6 +114,8 @@ public class TestInjection {
> >
> > public volatile static String failUpdateRequests = null;
> >
> > + public volatile static String leaderTragedy = null;
> > +
> > public volatile static String nonExistentCoreExceptionAfterUnload =
> null;
> >
> > public volatile static String updateLogReplayRandomPause = null;
> > @@ -171,6 +176,7 @@ public class TestInjection {
> > nonGracefullClose = null;
> > failReplicaRequests = null;
> > failUpdateRequests = null;
> > + leaderTragedy = null;
> > nonExistentCoreExceptionAfterUnload = null;
> > updateLogReplayRandomPause = null;
> > updateRandomPause = null;
> > @@ -337,6 +343,39 @@ public class TestInjection {
> >
> > return true;
> > }
> > +
> > + public static boolean injectLeaderTragedy(SolrCore core) {
> > + if (leaderTragedy != null) {
> > + Random rand = random();
> > + if (null == rand) return true;
> > +
> > + Pair<Boolean, Integer> pair = parseValue(leaderTragedy);
> > + boolean enabled = pair.first();
> > + int chanceIn100 = pair.second();
> > +
> > + if (! core.getCoreDescriptor().getCloudDescriptor().isLeader()) {
> > + return true;
> > + }
> > +
> > + if (enabled && rand.nextInt(100) >= (100 - chanceIn100)) {
> > + RefCounted<IndexWriter> writer = null;
> > + try {
> > + writer = core.getSolrCoreState().getIndexWriter(null);
> > + writer.get().onTragicEvent(new Exception("injected tragedy"),
> "injection");
> > + } catch (IOException e) {
> > + // Problem getting the writer, but that will likely bubble up
> later
> > + return true;
> > + } finally {
> > + if (writer != null) {
> > + writer.decref();
> > + }
> > + }
> > +
> > + throw new SolrException(ErrorCode.SERVER_ERROR, "Random tragedy
> fail");
> > + }
> > + }
> > + return true;
> > + }
> >
> > public static boolean
> injectNonExistentCoreExceptionAfterUnload(String cname) {
> > if (nonExistentCoreExceptionAfterUnload != null) {
> > diff --git
> a/solr/core/src/test/org/apache/solr/cloud/LeaderTragicEventTest.java
> b/solr/core/src/test/org/apache/solr/cloud/LeaderTragicEventTest.java
> > index 2be7add..f15ad47 100644
> > --- a/solr/core/src/test/org/apache/solr/cloud/LeaderTragicEventTest.java
> > +++ b/solr/core/src/test/org/apache/solr/cloud/LeaderTragicEventTest.java
> > @@ -17,145 +17,120 @@
> >
> > package org.apache.solr.cloud;
> >
> > -import static org.hamcrest.CoreMatchers.anyOf;
> > -import static org.hamcrest.CoreMatchers.is;
> > -
> > -import java.io.FileNotFoundException;
> > -import java.io.IOException;
> > -import java.lang.invoke.MethodHandles;
> > -import java.nio.file.NoSuchFileException;
> > -import java.util.ArrayList;
> > -import java.util.Collections;
> > -import java.util.List;
> > import org.apache.lucene.store.AlreadyClosedException;
> > -import org.apache.lucene.store.MockDirectoryWrapper;
> > -import org.apache.lucene.util.LuceneTestCase.AwaitsFix;
> > +import org.apache.solr.client.solrj.SolrClient;
> > +import org.apache.solr.client.solrj.SolrQuery;
> > +import org.apache.solr.client.solrj.SolrServerException;
> > import org.apache.solr.client.solrj.embedded.JettySolrRunner;
> > +import
> org.apache.solr.client.solrj.impl.BaseHttpSolrClient.RemoteSolrException;
> > import org.apache.solr.client.solrj.impl.HttpSolrClient;
> > -import
> org.apache.solr.client.solrj.impl.HttpSolrClient.RemoteSolrException;
> > import org.apache.solr.client.solrj.request.CollectionAdminRequest;
> > +import org.apache.solr.client.solrj.request.QueryRequest;
> > import org.apache.solr.client.solrj.request.UpdateRequest;
> > -import org.apache.solr.common.SolrException;
> > +import org.apache.solr.client.solrj.response.QueryResponse;
> > +import org.apache.solr.client.solrj.response.UpdateResponse;
> > import org.apache.solr.common.cloud.ClusterStateUtil;
> > import org.apache.solr.common.cloud.DocCollection;
> > import org.apache.solr.common.cloud.Replica;
> > import org.apache.solr.common.cloud.Slice;
> > -import org.apache.solr.core.CoreContainer;
> > -import org.apache.solr.core.DirectoryFactory;
> > -import org.apache.solr.core.MockDirectoryFactory;
> > -import org.apache.solr.core.SolrCore;
> > -import org.junit.AfterClass;
> > +import org.apache.solr.util.TestInjection;
> > +import org.hamcrest.MatcherAssert;
> > +import org.junit.After;
> > +import org.junit.Before;
> > import org.junit.BeforeClass;
> > import org.junit.Test;
> > import org.slf4j.Logger;
> > import org.slf4j.LoggerFactory;
> >
> > -@AwaitsFix(bugUrl="https://issues.apache.org/jira/browse/SOLR-13237")
> > -public class LeaderTragicEventTest extends SolrCloudTestCase {
> > +import java.io.IOException;
> > +import java.lang.invoke.MethodHandles;
> >
> > +import static org.hamcrest.CoreMatchers.anyOf;
> > +import static org.hamcrest.CoreMatchers.is;
> > +
> > +public class LeaderTragicEventTest extends SolrCloudTestCase {
> > private static final Logger log =
> LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
> >
> > + private String collection;
> > +
> > @BeforeClass
> > public static void setupCluster() throws Exception {
> > - System.setProperty("solr.mscheduler",
> "org.apache.solr.core.MockConcurrentMergeScheduler");
> > -
> System.setProperty(MockDirectoryFactory.SOLR_TESTS_USING_MOCK_DIRECTORY_WRAPPER,
> "true");
> > -
> > configureCluster(2)
> > .addConfig("config",
> TEST_PATH().resolve("configsets").resolve("cloud-minimal").resolve("conf"))
> > .configure();
> > }
> >
> > - @AfterClass
> > - public static void cleanup() {
> > - System.clearProperty("solr.mscheduler");
> > -
> System.clearProperty(MockDirectoryFactory.SOLR_TESTS_USING_MOCK_DIRECTORY_WRAPPER);
> > + @Before
> > + public void setUp() throws Exception {
> > + super.setUp();
> > + collection = getSaferTestName();
> > + cluster.getSolrClient().setDefaultCollection(collection);
> > }
> >
> > + @After
> > + public void tearDown() throws Exception {
> > + super.tearDown();
> > +
> CollectionAdminRequest.deleteCollection(collection).process(cluster.getSolrClient());
> > + }
> >
> > @Test
> > - public void test() throws Exception {
> > - final String collection = "collection1";
> > - cluster.getSolrClient().setDefaultCollection(collection);
> > + public void testLeaderFailsOver() throws Exception {
> > CollectionAdminRequest
> > .createCollection(collection, "config", 1, 2)
> > .process(cluster.getSolrClient());
> > cluster.waitForActiveCollection(collection, 1, 2);
> > - try {
> > - List<String> addedIds = new ArrayList<>();
> > - Replica oldLeader = corruptLeader(collection, addedIds);
> >
> > - waitForState("Timeout waiting for new replica become leader",
> collection, (liveNodes, collectionState) -> {
> > - Slice slice = collectionState.getSlice("shard1");
> > + UpdateResponse updateResponse = new UpdateRequest().add("id",
> "1").commit(cluster.getSolrClient(), null);
> > + assertEquals(0, updateResponse.getStatus());
> >
> > - if (slice.getReplicas().size() != 2) return false;
> > - if (slice.getLeader() == null) return false;
> > - if (slice.getLeader().getName().equals(oldLeader.getName()))
> return false;
> > + Replica oldLeader = corruptLeader(collection);
> >
> > - return true;
> > - });
> > -
> ClusterStateUtil.waitForAllActiveAndLiveReplicas(cluster.getSolrClient().getZkStateReader(),
> collection, 120000);
> > - Slice shard = getCollectionState(collection).getSlice("shard1");
> > - assertNotSame(shard.getLeader().getNodeName(),
> oldLeader.getNodeName());
> > - assertEquals(getNonLeader(shard).getNodeName(),
> oldLeader.getNodeName());
> > + waitForState("Now waiting for new replica to become leader",
> collection, (liveNodes, collectionState) -> {
> > + Slice slice = collectionState.getSlice("shard1");
> >
> > - for (String id : addedIds) {
> > - assertNotNull(cluster.getSolrClient().getById(collection,id));
> > - }
> > - if (log.isInfoEnabled()) {
> > - log.info("The test success oldLeader:{} currentState:{}",
> oldLeader, getCollectionState(collection));
> > - }
> > + if (slice.getReplicas().size() != 2) return false;
> > + if (slice.getLeader() == null) return false;
> > + if (slice.getLeader().getName().equals(oldLeader.getName()))
> return false;
> >
> > - } finally {
> > -
> CollectionAdminRequest.deleteCollection(collection).process(cluster.getSolrClient());
> > + return true;
> > + });
> > +
> ClusterStateUtil.waitForAllActiveAndLiveReplicas(cluster.getSolrClient().getZkStateReader(),
> collection, 120000);
> > + Slice shard = getCollectionState(collection).getSlice("shard1");
> > + assertNotEquals("Old leader should not be leader again",
> oldLeader.getNodeName(), shard.getLeader().getNodeName());
> > + assertEquals("Old leader should be a follower",
> oldLeader.getNodeName(), getNonLeader(shard).getNodeName());
> > +
> > + // Check that we can continue indexing after this
> > + updateResponse = new UpdateRequest().add("id",
> "2").commit(cluster.getSolrClient(), null);
> > + assertEquals(0, updateResponse.getStatus());
> > + try (SolrClient followerClient = new
> HttpSolrClient.Builder(oldLeader.getCoreUrl()).build()) {
> > + QueryResponse queryResponse = new QueryRequest(new
> SolrQuery("*:*")).process(followerClient);
> > + assertEquals(queryResponse.getResults().toString(), 2,
> queryResponse.getResults().getNumFound());
> > }
> > }
> >
> > - private Replica corruptLeader(String collection, List<String>
> addedIds) throws IOException {
> > - DocCollection dc = getCollectionState(collection);
> > - Replica oldLeader = dc.getLeader("shard1");
> > - log.info("Corrupt leader : {}", oldLeader);
> > -
> > - CoreContainer leaderCC =
> cluster.getReplicaJetty(oldLeader).getCoreContainer();
> > - SolrCore leaderCore = leaderCC.getCores().iterator().next();
> > - MockDirectoryWrapper mockDir = (MockDirectoryWrapper)
> leaderCore.getDirectoryFactory()
> > - .get(leaderCore.getIndexDir(),
> DirectoryFactory.DirContext.DEFAULT,
> leaderCore.getSolrConfig().indexConfig.lockType);
> > - leaderCore.getDirectoryFactory().release(mockDir);
> > -
> > - try (HttpSolrClient solrClient = new
> HttpSolrClient.Builder(dc.getLeader("shard1").getCoreUrl()).build()) {
> > - for (int i = 0; i < 100; i++) {
> > - new UpdateRequest()
> > - .add("id", i + "")
> > - .process(solrClient);
> > - solrClient.commit();
> > - addedIds.add(i + "");
> > -
> > - for (String file : mockDir.listAll()) {
> > - if (file.contains("segments_")) continue;
> > - if (file.endsWith("si")) continue;
> > - if (file.endsWith("fnm")) continue;
> > - if (random().nextBoolean()) continue;
> > -
> > - try {
> > - mockDir.corruptFiles(Collections.singleton(file));
> > - } catch (RuntimeException | FileNotFoundException |
> NoSuchFileException e) {
> > - // merges can lead to this exception
> > - }
> > - }
> > - }
> > - } catch (Exception e) {
> > - log.info("Corrupt leader ex: ", e);
> > -
> > - // solrClient.add/commit would throw RemoteSolrException with
> error code 500 or
> > - // 404(when the leader replica is already deleted by
> giveupLeadership)
> > - if (e instanceof RemoteSolrException) {
> > - SolrException se = (SolrException) e;
> > - assertThat(se.code(), anyOf(is(500), is(404)));
> > - } else if (!(e instanceof AlreadyClosedException)) {
> > - throw new RuntimeException("Unexpected exception", e);
> > + private Replica corruptLeader(String collection) throws IOException,
> SolrServerException {
> > + try {
> > + TestInjection.leaderTragedy = "true:100";
> > +
> > + DocCollection dc = getCollectionState(collection);
> > + Replica oldLeader = dc.getLeader("shard1");
> > + log.info("Will crash leader : {}", oldLeader);
> > +
> > + try (HttpSolrClient solrClient = new
> HttpSolrClient.Builder(dc.getLeader("shard1").getCoreUrl()).build()) {
> > + new UpdateRequest().add("id", "99").commit(solrClient, null);
> > + fail("Should have injected tragedy");
> > + } catch (RemoteSolrException e) {
> > + // solrClient.add would throw RemoteSolrException with code 500
> > + // or 404 if the bad replica has already been deleted
> > + MatcherAssert.assertThat(e.code(), anyOf(is(500), is(404)));
> > + } catch (AlreadyClosedException e) {
> > + // If giving up leadership, might be already closed/closing
> > }
> > - //else expected
> > +
> > + return oldLeader;
> > + } finally {
> > + TestInjection.leaderTragedy = null;
> > }
> > - return oldLeader;
> > }
> >
> > private Replica getNonLeader(Slice slice) {
> > @@ -165,8 +140,6 @@ public class LeaderTragicEventTest extends
> SolrCloudTestCase {
> >
> > @Test
> > public void testOtherReplicasAreNotActive() throws Exception {
> > - final String collection = "collection2";
> > - cluster.getSolrClient().setDefaultCollection(collection);
> > int numReplicas = random().nextInt(2) + 1;
> > // won't do anything if leader is the only one active replica in
> the shard
> > CollectionAdminRequest
> > @@ -174,7 +147,6 @@ public class LeaderTragicEventTest extends
> SolrCloudTestCase {
> > .process(cluster.getSolrClient());
> > cluster.waitForActiveCollection(collection, 1, numReplicas);
> >
> > - try {
> > JettySolrRunner otherReplicaJetty = null;
> > if (numReplicas == 2) {
> > Slice shard = getCollectionState(collection).getSlice("shard1");
> > @@ -187,7 +159,7 @@ public class LeaderTragicEventTest extends
> SolrCloudTestCase {
> > waitForState("Timeout waiting for replica get down",
> collection, (liveNodes, collectionState) ->
> getNonLeader(collectionState.getSlice("shard1")).getState() !=
> Replica.State.ACTIVE);
> > }
> >
> > - Replica oldLeader = corruptLeader(collection, new ArrayList<>());
> > + Replica oldLeader = corruptLeader(collection);
> >
> > if (otherReplicaJetty != null) {
> > otherReplicaJetty.start();
> > @@ -196,9 +168,6 @@ public class LeaderTragicEventTest extends
> SolrCloudTestCase {
> >
> > Replica leader =
> getCollectionState(collection).getSlice("shard1").getLeader();
> > assertEquals(leader.getName(), oldLeader.getName());
> > - } finally {
> > -
> CollectionAdminRequest.deleteCollection(collection).process(cluster.getSolrClient());
> > - }
> > }
> >
> >
> > diff --git
> a/solr/core/src/test/org/apache/solr/cloud/ShardTermsTest.java
> b/solr/core/src/test/org/apache/solr/cloud/ShardTermsTest.java
> > new file mode 100644
> > index 0000000..ec20cec
> > --- /dev/null
> > +++ b/solr/core/src/test/org/apache/solr/cloud/ShardTermsTest.java
> > @@ -0,0 +1,48 @@
> > +/*
> > + * Licensed to the Apache Software Foundation (ASF) under one or more
> > + * contributor license agreements. See the NOTICE file distributed with
> > + * this work for additional information regarding copyright ownership.
> > + * The ASF licenses this file to You under the Apache License, Version
> 2.0
> > + * (the "License"); you may not use this file except in compliance with
> > + * the License. You may obtain a copy of the License at
> > + *
> > + * http://www.apache.org/licenses/LICENSE-2.0
> > + *
> > + * Unless required by applicable law or agreed to in writing, software
> > + * distributed under the License is distributed on an "AS IS" BASIS,
> > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
> implied.
> > + * See the License for the specific language governing permissions and
> > + * limitations under the License.
> > + */
> > +
> > +package org.apache.solr.cloud;
> > +
> > +import org.apache.solr.SolrTestCase;
> > +import org.apache.solr.client.solrj.cloud.ShardTerms;
> > +import org.junit.Test;
> > +
> > +import java.util.Collections;
> > +import java.util.HashMap;
> > +import java.util.Map;
> > +
> > +public class ShardTermsTest extends SolrTestCase {
> > + @Test
> > + public void testIncreaseTerms() {
> > + Map<String, Long> map = new HashMap<>();
> > + map.put("leader", 0L);
> > + ShardTerms terms = new ShardTerms(map, 0);
> > + terms = terms.increaseTerms("leader",
> Collections.singleton("replica"));
> > + assertEquals(1L, terms.getTerm("leader").longValue());
> > +
> > + map.put("leader", 2L);
> > + map.put("live-replica", 2L);
> > + map.put("dead-replica", 1L);
> > + terms = new ShardTerms(map, 0);
> > + assertNull(terms.increaseTerms("leader",
> Collections.singleton("dead-replica")));
> > +
> > + terms = terms.increaseTerms("leader",
> Collections.singleton("leader"));
> > + assertEquals(3L, terms.getTerm("live-replica").longValue());
> > + assertEquals(2L, terms.getTerm("leader").longValue());
> > + assertEquals(1L, terms.getTerm("dead-replica").longValue());
> > + }
> > +}
> > diff --git
> a/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java
> b/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java
> > index 56ed8ae7..452c0da 100644
> > --- a/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java
> > +++ b/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java
> > @@ -32,7 +32,6 @@ import java.util.concurrent.atomic.AtomicInteger;
> > import java.util.function.Supplier;
> >
> > import org.apache.solr.client.solrj.SolrServerException;
> > -import org.apache.solr.client.solrj.cloud.ShardTerms;
> > import org.apache.solr.client.solrj.request.CollectionAdminRequest;
> > import org.apache.solr.common.util.TimeSource;
> > import org.apache.solr.util.TimeOut;
> > @@ -265,13 +264,6 @@ public class ZkShardTermsTest extends
> SolrCloudTestCase {
> > replicaTerms.close();
> > }
> >
> > - public void testEnsureTermsIsHigher() {
> > - Map<String, Long> map = new HashMap<>();
> > - map.put("leader", 0L);
> > - ShardTerms terms = new ShardTerms(map, 0);
> > - terms = terms.increaseTerms("leader",
> Collections.singleton("replica"));
> > - assertEquals(1L, terms.getTerm("leader").longValue());
> > - }
> >
> > public void testSetTermToZero() {
> > String collection = "setTermToZero";
> > diff --git
> a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/ShardTerms.java
> b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/ShardTerms.java
> > index 3b2f754..cd6ead0 100644
> > ---
> a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/ShardTerms.java
> > +++
> b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/ShardTerms.java
> > @@ -73,7 +73,6 @@ public class ShardTerms implements MapWriter {
> > */
> > public boolean haveHighestTermValue(String coreNodeName) {
> > if (values.isEmpty()) return true;
> > - long maxTerm = Collections.max(values.values());
> > return values.getOrDefault(coreNodeName, 0L) == maxTerm;
> > }
> >
> > @@ -92,7 +91,7 @@ public class ShardTerms implements MapWriter {
> > throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
> "Can not find leader's term " + leader);
> > }
> >
> > - boolean changed = false;
> > + boolean saveChanges = false;
> > boolean foundReplicasInLowerTerms = false;
> >
> > HashMap<String, Long> newValues = new HashMap<>(values);
> > @@ -102,16 +101,16 @@ public class ShardTerms implements MapWriter {
> > if (replicasNeedingRecovery.contains(key))
> foundReplicasInLowerTerms = true;
> > if (Objects.equals(entry.getValue(), leaderTerm)) {
> > if(skipIncreaseTermOf(key, replicasNeedingRecovery)) {
> > - changed = true;
> > + saveChanges = true; // if we don't skip anybody, then there's
> no reason to increment
> > } else {
> > - newValues.put(key, leaderTerm+1);
> > + entry.setValue(leaderTerm + 1);
> > }
> > }
> > }
> >
> > // We should skip the optimization if there are no
> replicasNeedingRecovery present in local terms,
> > // this may indicate that the current value is stale
> > - if (!changed && foundReplicasInLowerTerms) return null;
> > + if (!saveChanges && foundReplicasInLowerTerms) return null;
> > return new ShardTerms(newValues, version);
> > }
> >
> > @@ -167,6 +166,12 @@ public class ShardTerms implements MapWriter {
> > return new ShardTerms(newValues, version);
> > }
> >
> > + /**
> > + * Return a new {@link ShardTerms} in which the associate term of
> {@code coreNodeName} is equal to zero,
> > + * creating it if it does not previously exist.
> > + * @param coreNodeName of the replica
> > + * @return null if the term of {@code coreNodeName} already exists
> and is zero
> > + */
> > public ShardTerms setTermToZero(String coreNodeName) {
> > if (values.getOrDefault(coreNodeName, -1L) == 0) {
> > return null;
> > @@ -182,7 +187,6 @@ public class ShardTerms implements MapWriter {
> > * @return null if term of {@code coreNodeName} is already maximum
> > */
> > public ShardTerms setTermEqualsToLeader(String coreNodeName) {
> > - long maxTerm = getMaxTerm();
> > if (values.get(coreNodeName) == maxTerm) return null;
> >
> > HashMap<String, Long> newValues = new HashMap<>(values);
> > @@ -201,7 +205,6 @@ public class ShardTerms implements MapWriter {
> > * @return null if {@code coreNodeName} is already marked as doing
> recovering
> > */
> > public ShardTerms startRecovering(String coreNodeName) {
> > - long maxTerm = getMaxTerm();
> > if (values.get(coreNodeName) == maxTerm)
> > return null;
> >
> > @@ -246,7 +249,7 @@ public class ShardTerms implements MapWriter {
> > return version;
> > }
> >
> > - public Map<String , Long> getTerms() {
> > + public Map<String, Long> getTerms() {
> > return new HashMap<>(this.values);
> > }
> >
> >
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscribe@lucene.apache.org
> For additional commands, e-mail: dev-help@lucene.apache.org
>
>