You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by da...@apache.org on 2018/01/29 08:58:08 UTC
[2/2] lucene-solr:branch_7x: SOLR-11702: Redesign current LIR
implementation
SOLR-11702: Redesign current LIR implementation
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/8c8d78a4
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/8c8d78a4
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/8c8d78a4
Branch: refs/heads/branch_7x
Commit: 8c8d78a4bb6c0f3322471af5765a01848247409c
Parents: 16e80e6
Author: Cao Manh Dat <da...@apache.org>
Authored: Mon Jan 29 15:55:28 2018 +0700
Committer: Cao Manh Dat <da...@apache.org>
Committed: Mon Jan 29 15:57:45 2018 +0700
----------------------------------------------------------------------
solr/CHANGES.txt | 6 +
.../client/solrj/embedded/JettySolrRunner.java | 16 +-
.../org/apache/solr/cloud/ElectionContext.java | 48 +-
.../cloud/LeaderInitiatedRecoveryThread.java | 1 +
.../solr/cloud/RecoveringCoreTermWatcher.java | 75 +++
.../org/apache/solr/cloud/RecoveryStrategy.java | 75 ++-
.../apache/solr/cloud/ZkCollectionTerms.java | 65 +++
.../org/apache/solr/cloud/ZkController.java | 112 +++--
.../org/apache/solr/cloud/ZkShardTerms.java | 475 +++++++++++++++++++
.../api/collections/CreateCollectionCmd.java | 18 +-
.../solr/handler/admin/CollectionsHandler.java | 45 +-
.../solr/handler/admin/PrepRecoveryOp.java | 7 +
.../solr/update/DefaultSolrCoreState.java | 19 +-
.../processor/DistributedUpdateProcessor.java | 86 +++-
.../org/apache/solr/cloud/ForceLeaderTest.java | 220 +++++++--
.../solr/cloud/HttpPartitionOnCommitTest.java | 178 +++++++
.../apache/solr/cloud/HttpPartitionTest.java | 179 +++++--
.../solr/cloud/LIRRollingUpdatesTest.java | 457 ++++++++++++++++++
.../LeaderInitiatedRecoveryOnCommitTest.java | 178 -------
...aderInitiatedRecoveryOnShardRestartTest.java | 12 +-
.../TestLeaderInitiatedRecoveryThread.java | 1 +
.../org/apache/solr/cloud/ZkShardTermsTest.java | 204 ++++++++
.../solr/update/TestInPlaceUpdatesDistrib.java | 33 +-
.../cloud/AbstractFullDistribZkTestBase.java | 5 +-
24 files changed, 2112 insertions(+), 403 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8c8d78a4/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index fd98ce6..672f881 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -51,6 +51,10 @@ Upgrade Notes
Before 7.3, the copied over configset was named the same as the collection name, but 7.3 onwards it will be named
with an additional ".AUTOCREATED" suffix.
+* SOLR-11702: The old LIR implementation (SOLR-5495) is now deprecated and replaced.
+ Solr will support rolling upgrades from old 7.x versions of Solr to the new one until
+ the last release of the 7.x major version.
+
New Features
----------------------
* SOLR-11285: Simulation framework for autoscaling. (ab)
@@ -93,6 +97,8 @@ New Features
* SOLR-11617: Alias metadata is now mutable via a new MODIFYALIAS command. Metadata is returned from LISTALIASES.
(Gus Heck via David Smiley)
+* SOLR-11702: Redesign current LIR implementation (Cao Manh Dat, shalin)
+
Bug Fixes
----------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8c8d78a4/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java b/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
index e5b81f8..23a8dc1 100644
--- a/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
+++ b/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
@@ -84,7 +84,7 @@ public class JettySolrRunner {
FilterHolder debugFilter;
private boolean waitOnSolr = false;
- private int lastPort = -1;
+ private int jettyPort = -1;
private final JettyConfig config;
private final String solrHome;
@@ -280,8 +280,10 @@ public class JettySolrRunner {
@Override
public void lifeCycleStarted(LifeCycle arg0) {
- lastPort = getFirstConnectorPort();
- nodeProperties.setProperty("hostPort", Integer.toString(lastPort));
+ jettyPort = getFirstConnectorPort();
+ int port = jettyPort;
+ if (proxyPort != -1) port = proxyPort;
+ nodeProperties.setProperty("hostPort", Integer.toString(port));
nodeProperties.setProperty("hostContext", config.context);
root.getServletContext().setAttribute(SolrDispatchFilter.PROPERTIES_ATTRIBUTE, nodeProperties);
@@ -384,7 +386,7 @@ public class JettySolrRunner {
// if started before, make a new server
if (startedBefore) {
waitOnSolr = false;
- int port = reusePort ? lastPort : this.config.port;
+ int port = reusePort ? jettyPort : this.config.port;
init(port);
} else {
startedBefore = true;
@@ -456,7 +458,7 @@ public class JettySolrRunner {
if (0 == conns.length) {
throw new RuntimeException("Jetty Server has no Connectors");
}
- return (proxyPort != -1) ? proxyPort : ((ServerConnector) conns[0]).getLocalPort();
+ return ((ServerConnector) conns[0]).getLocalPort();
}
/**
@@ -465,10 +467,10 @@ public class JettySolrRunner {
* @exception RuntimeException if there is no Connector
*/
public int getLocalPort() {
- if (lastPort == -1) {
+ if (jettyPort == -1) {
throw new IllegalStateException("You cannot get the port until this instance has started");
}
- return (proxyPort != -1) ? proxyPort : lastPort;
+ return (proxyPort != -1) ? proxyPort : jettyPort;
}
/**
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8c8d78a4/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
index 7169ea8..2d00151 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
@@ -20,6 +20,7 @@ import java.io.Closeable;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.Future;
@@ -491,7 +492,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
rejoinLeaderElection(core);
}
}
-
+
if (isLeader) {
// check for any replicas in my shard that were set to down by the previous leader
try {
@@ -530,6 +531,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
return docCollection.getReplica(replicaName);
}
+ @Deprecated
public void checkLIR(String coreName, boolean allReplicasInLine)
throws InterruptedException, KeeperException, IOException {
if (allReplicasInLine) {
@@ -551,7 +553,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
leaderProps.getStr(ZkStateReader.CORE_NODE_NAME_PROP), Replica.State.ACTIVE, core.getCoreDescriptor(), true);
}
}
-
+
} else {
try (SolrCore core = cc.getCore(coreName)) {
if (core != null) {
@@ -567,7 +569,8 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
}
}
}
-
+
+ @Deprecated
private void startLeaderInitiatedRecoveryOnReplicas(String coreName) throws Exception {
try (SolrCore core = cc.getCore(coreName)) {
CloudDescriptor cloudDesc = core.getCoreDescriptor().getCloudDescriptor();
@@ -577,10 +580,10 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
if (coll == null || shardId == null) {
log.error("Cannot start leader-initiated recovery on new leader (core="+
- coreName+",coreNodeName=" + coreNodeName + ") because collection and/or shard is null!");
+ coreName+",coreNodeName=" + coreNodeName + ") because collection and/or shard is null!");
return;
}
-
+
String znodePath = zkController.getLeaderInitiatedRecoveryZnodePath(coll, shardId);
List<String> replicas = null;
try {
@@ -588,21 +591,28 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
} catch (NoNodeException nne) {
// this can be ignored
}
-
+
if (replicas != null && replicas.size() > 0) {
for (String replicaCoreNodeName : replicas) {
-
+
if (coreNodeName.equals(replicaCoreNodeName))
continue; // added safe-guard so we don't mark this core as down
-
+
+ if (zkController.getShardTerms(collection, shardId).registered(replicaCoreNodeName)) {
+ // the replica registered its term so it is running with the new LIR implementation
+ // we can put this replica into recovery by increase our terms
+ zkController.getShardTerms(collection, shardId).ensureTermsIsHigher(coreNodeName, Collections.singleton(replicaCoreNodeName));
+ continue;
+ }
+
final Replica.State lirState = zkController.getLeaderInitiatedRecoveryState(coll, shardId, replicaCoreNodeName);
if (lirState == Replica.State.DOWN || lirState == Replica.State.RECOVERY_FAILED) {
log.info("After core={} coreNodeName={} was elected leader, a replica coreNodeName={} was found in state: "
+ lirState.toString() + " and needing recovery.", coreName, coreNodeName, replicaCoreNodeName);
- List<ZkCoreNodeProps> replicaProps =
+ List<ZkCoreNodeProps> replicaProps =
zkController.getZkStateReader().getReplicaProps(collection, shardId, coreNodeName);
-
- if (replicaProps != null && replicaProps.size() > 0) {
+
+ if (replicaProps != null && replicaProps.size() > 0) {
ZkCoreNodeProps coreNodeProps = null;
for (ZkCoreNodeProps p : replicaProps) {
if (((Replica)p.getNodeProps()).getName().equals(replicaCoreNodeName)) {
@@ -610,17 +620,18 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
break;
}
}
-
+
zkController.ensureReplicaInLeaderInitiatedRecovery(cc,
collection, shardId, coreNodeProps, core.getCoreDescriptor(),
false /* forcePublishState */);
- }
+ }
}
}
}
- } // core gets closed automagically
+ } // core gets closed automagically
}
+
// returns true if all replicas are found to be up, false if not
private boolean waitForReplicasToComeUp(int timeoutms) throws InterruptedException {
long timeoutAt = System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeoutms, TimeUnit.MILLISECONDS);
@@ -743,7 +754,14 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
// to make sure others participate in sync and leader election, we can be leader
return true;
}
-
+
+ String coreNodeName = core.getCoreDescriptor().getCloudDescriptor().getCoreNodeName();
+ if (zkController.getShardTerms(collection, shardId).registered(coreNodeName)
+ && !zkController.getShardTerms(collection, shardId).canBecomeLeader(coreNodeName)) {
+ log.info("Can't become leader, term of replica {} less than leader", coreNodeName);
+ return false;
+ }
+
if (core.getCoreDescriptor().getCloudDescriptor().getLastPublished() == Replica.State.ACTIVE) {
log.debug("My last published State was Active, it's okay to be the leader.");
return true;
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8c8d78a4/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java b/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java
index 8c892ce..9c46236 100644
--- a/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java
+++ b/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java
@@ -45,6 +45,7 @@ import java.util.List;
* replica; used by a shard leader to nag a replica into recovering after the
* leader experiences an error trying to send an update request to the replica.
*/
+@Deprecated
public class LeaderInitiatedRecoveryThread extends Thread {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8c8d78a4/solr/core/src/java/org/apache/solr/cloud/RecoveringCoreTermWatcher.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveringCoreTermWatcher.java b/solr/core/src/java/org/apache/solr/cloud/RecoveringCoreTermWatcher.java
new file mode 100644
index 0000000..26fec97
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/RecoveringCoreTermWatcher.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud;
+
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.solr.core.SolrCore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Start recovery of a core if its term is less than leader's term
+ */
+public class RecoveringCoreTermWatcher implements ZkShardTerms.CoreTermWatcher {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private final SolrCore solrCore;
+ // used to prevent the case when term of other replicas get changed, we redo recovery
+ // the idea here is with a specific term of a replica, we only do recovery one
+ private final AtomicLong lastTermDoRecovery;
+
+ RecoveringCoreTermWatcher(SolrCore solrCore) {
+ this.solrCore = solrCore;
+ this.lastTermDoRecovery = new AtomicLong(-1);
+ }
+
+ @Override
+ public boolean onTermChanged(ZkShardTerms.Terms terms) {
+ if (solrCore.isClosed()) {
+ return false;
+ }
+
+ if (solrCore.getCoreDescriptor() == null || solrCore.getCoreDescriptor().getCloudDescriptor() == null) return true;
+
+ String coreNodeName = solrCore.getCoreDescriptor().getCloudDescriptor().getCoreNodeName();
+ if (terms.canBecomeLeader(coreNodeName)) return true;
+ if (lastTermDoRecovery.get() < terms.getTerm(coreNodeName)) {
+ log.info("Start recovery on {} because core's term is less than leader's term", coreNodeName);
+ lastTermDoRecovery.set(terms.getTerm(coreNodeName));
+ solrCore.getUpdateHandler().getSolrCoreState().doRecovery(solrCore.getCoreContainer(), solrCore.getCoreDescriptor());
+ }
+
+ return true;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ RecoveringCoreTermWatcher that = (RecoveringCoreTermWatcher) o;
+
+ return solrCore.equals(that.solrCore);
+ }
+
+ @Override
+ public int hashCode() {
+ return solrCore.hashCode();
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8c8d78a4/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
index 3ab4eca..63dfe19 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
@@ -35,8 +35,10 @@ import org.apache.solr.client.solrj.impl.HttpSolrClient.HttpUriRequestResponse;
import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
import org.apache.solr.client.solrj.request.CoreAdminRequest.WaitForState;
import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.SolrPingResponse;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
@@ -458,7 +460,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
core.getCoreDescriptor());
return;
}
-
+
// we temporary ignore peersync for tlog replicas
boolean firstTime = replicaType != Replica.Type.TLOG;
@@ -516,21 +518,18 @@ public class RecoveryStrategy implements Runnable, Closeable {
zkController.stopReplicationFromLeader(coreName);
}
+ final String ourUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, coreName);
Future<RecoveryInfo> replayFuture = null;
while (!successfulRecovery && !Thread.currentThread().isInterrupted() && !isClosed()) { // don't use interruption or it will close channels though
try {
CloudDescriptor cloudDesc = core.getCoreDescriptor().getCloudDescriptor();
- ZkNodeProps leaderprops = zkStateReader.getLeaderRetry(
- cloudDesc.getCollectionName(), cloudDesc.getShardId());
-
- final String leaderBaseUrl = leaderprops.getStr(ZkStateReader.BASE_URL_PROP);
- final String leaderCoreName = leaderprops.getStr(ZkStateReader.CORE_NAME_PROP);
-
- String leaderUrl = ZkCoreNodeProps.getCoreUrl(leaderBaseUrl, leaderCoreName);
-
- String ourUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, coreName);
+ final Replica leader = pingLeader(ourUrl, core.getCoreDescriptor(), true);
+ if (isClosed()) {
+ LOG.info("RecoveryStrategy has been closed");
+ break;
+ }
- boolean isLeader = leaderUrl.equals(ourUrl);
+ boolean isLeader = leader.getCoreUrl().equals(ourUrl);
if (isLeader && !cloudDesc.isLeader()) {
throw new SolrException(ErrorCode.SERVER_ERROR, "Cloud state still says we are leader.");
}
@@ -541,12 +540,12 @@ public class RecoveryStrategy implements Runnable, Closeable {
zkController.publish(core.getCoreDescriptor(), Replica.State.ACTIVE);
return;
}
-
+
LOG.info("Begin buffering updates. core=[{}]", coreName);
ulog.bufferUpdates();
replayed = false;
- LOG.info("Publishing state of core [{}] as recovering, leader is [{}] and I am [{}]", core.getName(), leaderUrl,
+ LOG.info("Publishing state of core [{}] as recovering, leader is [{}] and I am [{}]", core.getName(), leader.getCoreUrl(),
ourUrl);
zkController.publish(core.getCoreDescriptor(), Replica.State.RECOVERING);
@@ -565,7 +564,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
break;
}
- sendPrepRecoveryCmd(leaderBaseUrl, leaderCoreName, slice);
+ sendPrepRecoveryCmd(leader.getBaseUrl(), leader.getCoreName(), slice);
if (isClosed()) {
LOG.info("RecoveryStrategy has been closed");
@@ -585,11 +584,11 @@ public class RecoveryStrategy implements Runnable, Closeable {
// first thing we just try to sync
if (firstTime) {
firstTime = false; // only try sync the first time through the loop
- LOG.info("Attempting to PeerSync from [{}] - recoveringAfterStartup=[{}]", leaderUrl, recoveringAfterStartup);
+ LOG.info("Attempting to PeerSync from [{}] - recoveringAfterStartup=[{}]", leader.getCoreUrl(), recoveringAfterStartup);
// System.out.println("Attempting to PeerSync from " + leaderUrl
// + " i am:" + zkController.getNodeName());
PeerSync peerSync = new PeerSync(core,
- Collections.singletonList(leaderUrl), ulog.getNumRecordsToKeep(), false, false);
+ Collections.singletonList(leader.getCoreUrl()), ulog.getNumRecordsToKeep(), false, false);
peerSync.setStartingVersions(recentVersions);
boolean syncSuccess = peerSync.sync().isSuccess();
if (syncSuccess) {
@@ -623,7 +622,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
try {
- replicate(zkController.getNodeName(), core, leaderprops);
+ replicate(zkController.getNodeName(), core, leader);
if (isClosed()) {
LOG.info("RecoveryStrategy has been closed");
@@ -745,6 +744,48 @@ public class RecoveryStrategy implements Runnable, Closeable {
LOG.info("Finished recovery process, successful=[{}]", Boolean.toString(successfulRecovery));
}
+ private final Replica pingLeader(String ourUrl, CoreDescriptor coreDesc, boolean mayPutReplicaAsDown) throws Exception {
+ int numTried = 0;
+ while (true) {
+ CloudDescriptor cloudDesc = coreDesc.getCloudDescriptor();
+ DocCollection docCollection = zkStateReader.getClusterState().getCollection(cloudDesc.getCollectionName());
+ if (mayPutReplicaAsDown && numTried == 1 &&
+ docCollection.getReplica(coreDesc.getCloudDescriptor().getCoreNodeName()).getState() == Replica.State.ACTIVE) {
+ // this operation may take a long time, by putting replica into DOWN state, client won't query this replica
+ zkController.publish(coreDesc, Replica.State.DOWN);
+ }
+ numTried++;
+ final Replica leaderReplica = zkStateReader.getLeaderRetry(
+ cloudDesc.getCollectionName(), cloudDesc.getShardId());
+
+ if (isClosed()) {
+ return leaderReplica;
+ }
+
+ if (leaderReplica.getCoreUrl().equals(ourUrl)) {
+ return leaderReplica;
+ }
+
+ try (HttpSolrClient httpSolrClient = new HttpSolrClient.Builder(leaderReplica.getCoreUrl())
+ .withSocketTimeout(1000)
+ .withConnectionTimeout(1000)
+ .build()) {
+ SolrPingResponse resp = httpSolrClient.ping();
+ return leaderReplica;
+ } catch (IOException e) {
+ LOG.info("Failed to connect leader {} on recovery, try again", leaderReplica.getBaseUrl(), e);
+ Thread.sleep(500);
+ } catch (Exception e) {
+ if (e.getCause() instanceof IOException) {
+ LOG.info("Failed to connect leader {} on recovery, try again", leaderReplica.getBaseUrl(), e);
+ Thread.sleep(500);
+ } else {
+ return leaderReplica;
+ }
+ }
+ }
+ }
+
public static Runnable testing_beforeReplayBufferingUpdates;
final private Future<RecoveryInfo> replay(SolrCore core)
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8c8d78a4/solr/core/src/java/org/apache/solr/cloud/ZkCollectionTerms.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkCollectionTerms.java b/solr/core/src/java/org/apache/solr/cloud/ZkCollectionTerms.java
new file mode 100644
index 0000000..b232f9b
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkCollectionTerms.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.util.ObjectReleaseTracker;
+import org.apache.solr.core.CoreDescriptor;
+
+/**
+ * Used to manage all ZkShardTerms of a collection
+ */
+class ZkCollectionTerms implements AutoCloseable {
+ private final String collection;
+ private final Map<String, ZkShardTerms> terms;
+ private final SolrZkClient zkClient;
+
+ ZkCollectionTerms(String collection, SolrZkClient client) {
+ this.collection = collection;
+ this.terms = new HashMap<>();
+ this.zkClient = client;
+ ObjectReleaseTracker.track(this);
+ }
+
+
+ public ZkShardTerms getShard(String shardId) {
+ synchronized (terms) {
+ if (!terms.containsKey(shardId)) terms.put(shardId, new ZkShardTerms(collection, shardId, zkClient));
+ return terms.get(shardId);
+ }
+ }
+
+ public void remove(String shardId, CoreDescriptor coreDescriptor) {
+ synchronized (terms) {
+ if (getShard(shardId).removeTerm(coreDescriptor)) {
+ terms.remove(shardId).close();
+ }
+ }
+ }
+
+ public void close() {
+ synchronized (terms) {
+ terms.values().forEach(ZkShardTerms::close);
+ }
+ ObjectReleaseTracker.release(this);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8c8d78a4/solr/core/src/java/org/apache/solr/cloud/ZkController.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index 365da65..7898e96 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -193,7 +193,6 @@ public class ZkController {
private final Map<ContextKey, ElectionContext> electionContexts = Collections.synchronizedMap(new HashMap<>());
private final SolrZkClient zkClient;
- private final ZkCmdExecutor cmdExecutor;
public final ZkStateReader zkStateReader;
private SolrCloudManager cloudManager;
private CloudSolrClient cloudSolrClient;
@@ -210,6 +209,7 @@ public class ZkController {
private LeaderElector overseerElector;
private Map<String, ReplicateFromLeader> replicateFromLeaders = new ConcurrentHashMap<>();
+ private final Map<String, ZkCollectionTerms> collectionToTerms = new HashMap<>();
// for now, this can be null in tests, in which case recovery will be inactive, and other features
// may accept defaults or use mocks rather than pulling things from a CoreContainer
@@ -226,6 +226,7 @@ public class ZkController {
private volatile boolean isClosed;
+ @Deprecated
// keeps track of replicas that have been asked to recover by leaders running on this node
private final Map<String, String> replicasInLeaderInitiatedRecovery = new HashMap<String, String>();
@@ -323,7 +324,7 @@ public class ZkController {
@Override
public void command() {
log.info("ZooKeeper session re-connected ... refreshing core states after session expiration.");
-
+ clearZkCollectionTerms();
try {
zkStateReader.createClusterStateWatchersAndUpdate();
@@ -435,7 +436,6 @@ public class ZkController {
this.overseerRunningMap = Overseer.getRunningMap(zkClient);
this.overseerCompletedMap = Overseer.getCompletedMap(zkClient);
this.overseerFailureMap = Overseer.getFailureMap(zkClient);
- cmdExecutor = new ZkCmdExecutor(clientTimeout);
zkStateReader = new ZkStateReader(zkClient, () -> {
if (cc != null) cc.securityNodeChanged();
});
@@ -547,6 +547,9 @@ public class ZkController {
*/
public void close() {
this.isClosed = true;
+ synchronized (collectionToTerms) {
+ collectionToTerms.values().forEach(ZkCollectionTerms::close);
+ }
try {
for (ElectionContext context : electionContexts.values()) {
try {
@@ -1034,7 +1037,14 @@ public class ZkController {
final String coreZkNodeName = desc.getCloudDescriptor().getCoreNodeName();
assert coreZkNodeName != null : "we should have a coreNodeName by now";
-
+
+ ZkShardTerms shardTerms = getShardTerms(collection, cloudDesc.getShardId());
+
+ // This flag is used for testing rolling updates and should be removed in SOLR-11812
+ boolean isRunningInNewLIR = "new".equals(desc.getCoreProperty("lirVersion", "new"));
+ if (isRunningInNewLIR) {
+ shardTerms.registerTerm(coreZkNodeName);
+ }
String shardId = cloudDesc.getShardId();
Map<String,Object> props = new HashMap<>();
// we only put a subset of props into the leader node
@@ -1118,15 +1128,17 @@ public class ZkController {
}
}
boolean didRecovery
- = checkRecovery(recoverReloadedCores, isLeader, skipRecovery, collection, coreZkNodeName, core, cc, afterExpiration);
+ = checkRecovery(recoverReloadedCores, isLeader, skipRecovery, collection, coreZkNodeName, shardId, core, cc, afterExpiration);
if (!didRecovery) {
if (isTlogReplicaAndNotLeader) {
startReplicationFromLeader(coreName, true);
}
publish(desc, Replica.State.ACTIVE);
}
-
-
+
+ if (isRunningInNewLIR && replicaType != Type.PULL) {
+ shardTerms.addListener(new RecoveringCoreTermWatcher(core));
+ }
core.getCoreDescriptor().getCloudDescriptor().setHasRegistered(true);
}
@@ -1295,7 +1307,7 @@ public class ZkController {
* Returns whether or not a recovery was started
*/
private boolean checkRecovery(boolean recoverReloadedCores, final boolean isLeader, boolean skipRecovery,
- final String collection, String shardId,
+ final String collection, String coreZkNodeName, String shardId,
SolrCore core, CoreContainer cc, boolean afterExpiration) {
if (SKIP_AUTO_RECOVERY) {
log.warn("Skipping recovery according to sys prop solrcloud.skip.autorecovery");
@@ -1322,6 +1334,13 @@ public class ZkController {
core.getUpdateHandler().getSolrCoreState().doRecovery(cc, core.getCoreDescriptor());
return true;
}
+
+ ZkShardTerms zkShardTerms = getShardTerms(collection, shardId);
+ if (zkShardTerms.registered(coreZkNodeName) && !zkShardTerms.canBecomeLeader(coreZkNodeName)) {
+ log.info("Leader's term larger than core " + core.getName() + "; starting recovery process");
+ core.getUpdateHandler().getSolrCoreState().doRecovery(cc, core.getCoreDescriptor());
+ return true;
+ }
} else {
log.info("I am the leader, no recovery necessary");
}
@@ -1372,6 +1391,7 @@ public class ZkController {
String shardId = cd.getCloudDescriptor().getShardId();
String coreNodeName = cd.getCloudDescriptor().getCoreNodeName();
+
// If the leader initiated recovery, then verify that this replica has performed
// recovery as requested before becoming active; don't even look at lirState if going down
if (state != Replica.State.DOWN) {
@@ -1394,7 +1414,7 @@ public class ZkController {
}
}
}
-
+
Map<String,Object> props = new HashMap<>();
props.put(Overseer.QUEUE_OPERATION, "state");
props.put(ZkStateReader.STATE_PROP, state.toString());
@@ -1430,6 +1450,11 @@ public class ZkController {
log.info("The core '{}' had failed to initialize before.", cd.getName());
}
+ // This flag is used for testing rolling updates and should be removed in SOLR-11812
+ boolean isRunningInNewLIR = "new".equals(cd.getCoreProperty("lirVersion", "new"));
+ if (state == Replica.State.RECOVERING && isRunningInNewLIR) {
+ getShardTerms(collection, shardId).setEqualsToMax(coreNodeName);
+ }
ZkNodeProps m = new ZkNodeProps(props);
if (updateLastState) {
@@ -1441,23 +1466,28 @@ public class ZkController {
}
}
- private boolean needsToBeAssignedShardId(final CoreDescriptor desc,
- final ClusterState state, final String coreNodeName) {
-
- final CloudDescriptor cloudDesc = desc.getCloudDescriptor();
+ public ZkShardTerms getShardTerms(String collection, String shardId) {
+ return getCollectionTerms(collection).getShard(shardId);
+ }
- final String shardId = state.getShardId(getNodeName(), desc.getName());
+ private ZkCollectionTerms getCollectionTerms(String collection) {
+ synchronized (collectionToTerms) {
+ if (!collectionToTerms.containsKey(collection)) collectionToTerms.put(collection, new ZkCollectionTerms(collection, zkClient));
+ return collectionToTerms.get(collection);
+ }
+ }
- if (shardId != null) {
- cloudDesc.setShardId(shardId);
- return false;
+ public void clearZkCollectionTerms() {
+ synchronized (collectionToTerms) {
+ collectionToTerms.values().forEach(ZkCollectionTerms::close);
+ collectionToTerms.clear();
}
- return true;
}
public void unregister(String coreName, CoreDescriptor cd) throws Exception {
final String coreNodeName = cd.getCloudDescriptor().getCoreNodeName();
final String collection = cd.getCloudDescriptor().getCollectionName();
+ getCollectionTerms(collection).remove(cd.getCloudDescriptor().getShardId(), cd);
if (Strings.isNullOrEmpty(collection)) {
log.error("No collection was specified.");
@@ -1733,7 +1763,7 @@ public class ZkController {
boolean isLeader = leaderProps.getCoreUrl().equals(ourUrl);
if (!isLeader && !SKIP_AUTO_RECOVERY) {
- // detect if this core is in leader-initiated recovery and if so,
+ // detect if this core is in leader-initiated recovery and if so,
// then we don't need the leader to wait on seeing the down state
Replica.State lirState = null;
try {
@@ -1743,9 +1773,9 @@ public class ZkController {
" is in leader-initiated recovery due to: " + exc, exc);
}
- if (lirState != null) {
- log.debug("Replica " + myCoreNodeName +
- " is already in leader-initiated recovery, so not waiting for leader to see down state.");
+ if (lirState != null || !getShardTerms(collection, shard).canBecomeLeader(myCoreNodeName)) {
+ log.debug("Term of replica " + myCoreNodeName +
+ " is already less than leader, so not waiting for leader to see down state.");
} else {
log.info("Replica " + myCoreNodeName +
@@ -2055,6 +2085,7 @@ public class ZkController {
* false means the node is not live either, so no point in trying to send recovery commands
* to it.
*/
+ @Deprecated
public boolean ensureReplicaInLeaderInitiatedRecovery(
final CoreContainer container,
final String collection, final String shardId, final ZkCoreNodeProps replicaCoreProps,
@@ -2117,13 +2148,14 @@ public class ZkController {
" is not live, so skipping leader-initiated recovery for replica: core={} coreNodeName={}",
replicaCoreProps.getCoreName(), replicaCoreNodeName);
// publishDownState will be false to avoid publishing the "down" state too many times
- // as many errors can occur together and will each call into this method (SOLR-6189)
+ // as many errors can occur together and will each call into this method (SOLR-6189)
}
}
return nodeIsLive;
}
+ @Deprecated
public boolean isReplicaInRecoveryHandling(String replicaUrl) {
boolean exists = false;
synchronized (replicasInLeaderInitiatedRecovery) {
@@ -2132,12 +2164,14 @@ public class ZkController {
return exists;
}
+ @Deprecated
public void removeReplicaFromLeaderInitiatedRecoveryHandling(String replicaUrl) {
synchronized (replicasInLeaderInitiatedRecovery) {
replicasInLeaderInitiatedRecovery.remove(replicaUrl);
}
}
+ @Deprecated
public Replica.State getLeaderInitiatedRecoveryState(String collection, String shardId, String coreNodeName) {
final Map<String, Object> stateObj = getLeaderInitiatedRecoveryStateObject(collection, shardId, coreNodeName);
if (stateObj == null) {
@@ -2147,6 +2181,7 @@ public class ZkController {
return stateStr == null ? null : Replica.State.getState(stateStr);
}
+ @Deprecated
public Map<String, Object> getLeaderInitiatedRecoveryStateObject(String collection, String shardId, String coreNodeName) {
if (collection == null || shardId == null || coreNodeName == null)
@@ -2191,6 +2226,7 @@ public class ZkController {
return stateObj;
}
+ @Deprecated
public void updateLeaderInitiatedRecoveryState(String collection, String shardId, String coreNodeName,
Replica.State state, CoreDescriptor leaderCd, boolean retryOnConnLoss) {
if (collection == null || shardId == null || coreNodeName == null) {
@@ -2199,12 +2235,12 @@ public class ZkController {
+ "; shardId=" + shardId + "; coreNodeName=" + coreNodeName);
return; // if we don't have complete data about a core in cloud mode, do nothing
}
-
+
assert leaderCd != null;
assert leaderCd.getCloudDescriptor() != null;
String leaderCoreNodeName = leaderCd.getCloudDescriptor().getCoreNodeName();
-
+
String znodePath = getLeaderInitiatedRecoveryZnodePath(collection, shardId, coreNodeName);
if (state == Replica.State.ACTIVE) {
@@ -2269,29 +2305,29 @@ public class ZkController {
private void markShardAsDownIfLeader(String collection, String shardId, CoreDescriptor leaderCd,
String znodePath, byte[] znodeData,
boolean retryOnConnLoss) throws KeeperException, InterruptedException {
-
+
if (!leaderCd.getCloudDescriptor().isLeader()) {
log.info("No longer leader, aborting attempt to mark shard down as part of LIR");
throw new NotLeaderException(ErrorCode.SERVER_ERROR, "Locally, we do not think we are the leader.");
}
-
+
ContextKey key = new ContextKey(collection, leaderCd.getCloudDescriptor().getCoreNodeName());
ElectionContext context = electionContexts.get(key);
-
+
// we make sure we locally think we are the leader before and after getting the context - then
// we only try zk if we still think we are the leader and have our leader context
if (context == null || !leaderCd.getCloudDescriptor().isLeader()) {
log.info("No longer leader, aborting attempt to mark shard down as part of LIR");
throw new NotLeaderException(ErrorCode.SERVER_ERROR, "Locally, we do not think we are the leader.");
}
-
+
// we think we are the leader - get the expected shard leader version
// we use this version and multi to ensure *only* the current zk registered leader
// for a shard can put a replica into LIR
-
+
Integer leaderZkNodeParentVersion = ((ShardLeaderElectionContextBase)context).getLeaderZkNodeParentVersion();
-
+
// TODO: should we do this optimistically to avoid races?
if (zkClient.exists(znodePath, retryOnConnLoss)) {
List<Op> ops = new ArrayList<>(2);
@@ -2306,7 +2342,7 @@ public class ZkController {
} catch (KeeperException.NodeExistsException nee) {
// if it exists, that's great!
}
-
+
// we only create the entry if the context we are using is registered as the current leader in ZK
List<Op> ops = new ArrayList<>(2);
ops.add(Op.check(new org.apache.hadoop.fs.Path(((ShardLeaderElectionContextBase)context).leaderPath).getParent().toString(), leaderZkNodeParentVersion));
@@ -2316,11 +2352,13 @@ public class ZkController {
}
}
- public String getLeaderInitiatedRecoveryZnodePath(String collection, String shardId) {
+ @Deprecated
+ public static String getLeaderInitiatedRecoveryZnodePath(String collection, String shardId) {
return "/collections/" + collection + "/leader_initiated_recovery/" + shardId;
}
- public String getLeaderInitiatedRecoveryZnodePath(String collection, String shardId, String coreNodeName) {
+ @Deprecated
+ public static String getLeaderInitiatedRecoveryZnodePath(String collection, String shardId, String coreNodeName) {
return getLeaderInitiatedRecoveryZnodePath(collection, shardId) + "/" + coreNodeName;
}
@@ -2608,12 +2646,6 @@ public class ZkController {
};
}
- public String getLeaderSeqPath(String collection, String coreNodeName) {
- ContextKey key = new ContextKey(collection, coreNodeName);
- ElectionContext context = electionContexts.get(key);
- return context != null ? context.leaderSeqPath : null;
- }
-
/**
* Thrown during leader initiated recovery process if current node is not leader
*/
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8c8d78a4/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
new file mode 100644
index 0000000..7dc0d57
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
@@ -0,0 +1,475 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud;
+
+import java.lang.invoke.MethodHandles;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.ObjectReleaseTracker;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.core.CoreDescriptor;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class used for interact with a ZK term node.
+ * Each ZK term node relates to a shard of a collection and have this format (in json)
+ * <p>
+ * <code>
+ * {
+ * "replicaNodeName1" : 1,
+ * "replicaNodeName2" : 2,
+ * ..
+ * }
+ * </code>
+ * <p>
+ * The values correspond to replicas are called terms.
+ * Only replicas with highest term value are considered up to date and be able to become leader and serve queries.
+ * <p>
+ * Terms can only updated in two strict ways:
+ * <ul>
+ * <li>A replica sets its term equals to leader's term
+ * <li>The leader increase its term and some other replicas by 1
+ * </ul>
+ * This class should not be reused after {@link org.apache.zookeeper.Watcher.Event.KeeperState#Expired} event
+ */
+public class ZkShardTerms implements AutoCloseable{
+
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private final Object writingLock = new Object();
+ private final String collection;
+ private final String shard;
+ private final String znodePath;
+ private final SolrZkClient zkClient;
+ private final Set<CoreTermWatcher> listeners = new HashSet<>();
+ private final AtomicBoolean isClosed = new AtomicBoolean(false);
+
+ private Terms terms;
+
+ // Listener of a core for shard's term change events
+ interface CoreTermWatcher {
+ // return true if the listener wanna to be triggered in the next time
+ boolean onTermChanged(Terms terms);
+ }
+
+ public ZkShardTerms(String collection, String shard, SolrZkClient zkClient) {
+ this.znodePath = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection + "/terms/" + shard;
+ this.collection = collection;
+ this.shard = shard;
+ this.zkClient = zkClient;
+ ensureTermNodeExist();
+ refreshTerms();
+ retryRegisterWatcher();
+ ObjectReleaseTracker.track(this);
+ }
+
+ /**
+ * Ensure that leader's term is higher than some replica's terms
+ * @param leader coreNodeName of leader
+ * @param replicasNeedingRecovery set of replicas in which their terms should be lower than leader's term
+ */
+ public void ensureTermsIsHigher(String leader, Set<String> replicasNeedingRecovery) {
+ Terms newTerms;
+ while( (newTerms = terms.increaseTerms(leader, replicasNeedingRecovery)) != null) {
+ if (forceSaveTerms(newTerms)) return;
+ }
+ }
+
+ /**
+ * Can this replica become leader or is this replica's term equals to leader's term?
+ * @param coreNodeName of the replica
+ * @return true if this replica can become leader, false if otherwise
+ */
+ public boolean canBecomeLeader(String coreNodeName) {
+ return terms.canBecomeLeader(coreNodeName);
+ }
+
+ /**
+ * Did this replica registered its term? This is a sign to check f
+ * @param coreNodeName of the replica
+ * @return true if this replica registered its term, false if otherwise
+ */
+ public boolean registered(String coreNodeName) {
+ return terms.getTerm(coreNodeName) != null;
+ }
+
+ public void close() {
+ // no watcher will be registered
+ isClosed.set(true);
+ synchronized (listeners) {
+ listeners.clear();
+ }
+ ObjectReleaseTracker.release(this);
+ }
+
+ // package private for testing, only used by tests
+ Map<String, Long> getTerms() {
+ synchronized (writingLock) {
+ return new HashMap<>(terms.values);
+ }
+ }
+
+ /**
+ * Add a listener so the next time the shard's term get updated, listeners will be called
+ */
+ void addListener(CoreTermWatcher listener) {
+ synchronized (listeners) {
+ listeners.add(listener);
+ }
+ }
+
+ /**
+ * Remove the coreNodeName from terms map and also remove any expired listeners
+ * @return Return true if this object should not be reused
+ */
+ boolean removeTerm(CoreDescriptor cd) {
+ int numListeners;
+ synchronized (listeners) {
+ // solrcore already closed
+ listeners.removeIf(coreTermWatcher -> !coreTermWatcher.onTermChanged(terms));
+ numListeners = listeners.size();
+ }
+ Terms newTerms;
+ while ( (newTerms = terms.removeTerm(cd.getCloudDescriptor().getCoreNodeName())) != null) {
+ try {
+ if (saveTerms(newTerms)) return numListeners == 0;
+ } catch (KeeperException.NoNodeException e) {
+ return true;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Register a replica's term (term value will be 0).
+ * If a term is already associate with this replica do nothing
+ * @param coreNodeName of the replica
+ */
+ void registerTerm(String coreNodeName) {
+ Terms newTerms;
+ while ( (newTerms = terms.registerTerm(coreNodeName)) != null) {
+ if (forceSaveTerms(newTerms)) break;
+ }
+ }
+
+ /**
+ * Set a replica's term equals to leader's term
+ * @param coreNodeName of the replica
+ */
+ public void setEqualsToMax(String coreNodeName) {
+ Terms newTerms;
+ while ( (newTerms = terms.setEqualsToMax(coreNodeName)) != null) {
+ if (forceSaveTerms(newTerms)) break;
+ }
+ }
+
+ public long getTerm(String coreNodeName) {
+ Long term = terms.getTerm(coreNodeName);
+ return term == null? -1 : term;
+ }
+
+ // package private for testing, only used by tests
+ int getNumListeners() {
+ synchronized (listeners) {
+ return listeners.size();
+ }
+ }
+
+ /**
+ * Set new terms to ZK.
+ * In case of correspond ZK term node is not created, create it
+ * @param newTerms to be set
+ * @return true if terms is saved successfully to ZK, false if otherwise
+ */
+ private boolean forceSaveTerms(Terms newTerms) {
+ try {
+ return saveTerms(newTerms);
+ } catch (KeeperException.NoNodeException e) {
+ ensureTermNodeExist();
+ return false;
+ }
+ }
+
+ /**
+ * Set new terms to ZK, the version of new terms must match the current ZK term node
+ * @param newTerms to be set
+ * @return true if terms is saved successfully to ZK, false if otherwise
+ * @throws KeeperException.NoNodeException correspond ZK term node is not created
+ */
+ private boolean saveTerms(Terms newTerms) throws KeeperException.NoNodeException {
+ byte[] znodeData = Utils.toJSON(newTerms.values);
+ try {
+ Stat stat = zkClient.setData(znodePath, znodeData, newTerms.version, true);
+ setNewTerms(new Terms(newTerms.values, stat.getVersion()));
+ return true;
+ } catch (KeeperException.BadVersionException e) {
+ log.info("Failed to save terms, version is not match, retrying");
+ refreshTerms();
+ } catch (KeeperException.NoNodeException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error save shard term for collection:" + collection, e);
+ }
+ return false;
+ }
+
+ /**
+ * Create correspond ZK term node
+ */
+ private void ensureTermNodeExist() {
+ String path = "/collections/"+collection+ "/terms";
+ try {
+ if (!zkClient.exists(path, true)) {
+ try {
+ zkClient.makePath(path, true);
+ } catch (KeeperException.NodeExistsException e) {
+ // it's okay if another beats us creating the node
+ }
+ }
+ path += "/"+shard;
+ if (!zkClient.exists(path, true)) {
+ try {
+ Map<String, Long> initialTerms = new HashMap<>();
+ zkClient.create(path, Utils.toJSON(initialTerms), CreateMode.PERSISTENT, true);
+ } catch (KeeperException.NodeExistsException e) {
+ // it's okay if another beats us creating the node
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.interrupted();
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error creating shard term node in Zookeeper for collection:" + collection, e);
+ } catch (KeeperException e) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error creating shard term node in Zookeeper for collection:" + collection, e);
+ }
+ }
+
+ /**
+ * Fetch latest terms from ZK
+ */
+ public void refreshTerms() {
+ Terms newTerms;
+ try {
+ Stat stat = new Stat();
+ byte[] data = zkClient.getData(znodePath, null, stat, true);
+ newTerms = new Terms((Map<String, Long>) Utils.fromJSON(data), stat.getVersion());
+ } catch (KeeperException e) {
+ Thread.interrupted();
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error updating shard term for collection:" + collection, e);
+ } catch (InterruptedException e) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error updating shard term for collection:" + collection, e);
+ }
+
+ setNewTerms(newTerms);
+ }
+
+ /**
+ * Retry register a watcher to the correspond ZK term node
+ */
+ private void retryRegisterWatcher() {
+ while (!isClosed.get()) {
+ try {
+ registerWatcher();
+ return;
+ } catch (KeeperException.SessionExpiredException | KeeperException.AuthFailedException e) {
+ isClosed.set(true);
+ log.error("Failed watching shard term for collection: {} due to unrecoverable exception", collection, e);
+ return;
+ } catch (KeeperException e) {
+ log.warn("Failed watching shard term for collection:{}, retrying!", collection, e);
+ try {
+ zkClient.getConnectionManager().waitForConnected(zkClient.getZkClientTimeout());
+ } catch (TimeoutException te) {
+ if (Thread.interrupted()) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error watching shard term for collection:" + collection, te);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Register a watcher to the correspond ZK term node
+ */
+ private void registerWatcher() throws KeeperException {
+ Watcher watcher = event -> {
+ // session events are not change events, and do not remove the watcher
+ if (Watcher.Event.EventType.None == event.getType()) {
+ return;
+ }
+ retryRegisterWatcher();
+ // Some events may be missed during register a watcher, so it is safer to refresh terms after registering watcher
+ refreshTerms();
+ };
+ try {
+ // exists operation is faster than getData operation
+ zkClient.exists(znodePath, watcher, true);
+ } catch (InterruptedException e) {
+ Thread.interrupted();
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error watching shard term for collection:" + collection, e);
+ }
+ }
+
+
+ /**
+ * Atomically update {@link ZkShardTerms#terms} and call listeners
+ * @param newTerms to be set
+ */
+ private void setNewTerms(Terms newTerms) {
+ boolean isChanged = false;
+ synchronized (writingLock) {
+ if (terms == null || newTerms.version > terms.version) {
+ terms = newTerms;
+ isChanged = true;
+ }
+ }
+ if (isChanged) onTermUpdates(newTerms);
+ }
+
+ private void onTermUpdates(Terms newTerms) {
+ synchronized (listeners) {
+ listeners.removeIf(coreTermWatcher -> !coreTermWatcher.onTermChanged(newTerms));
+ }
+ }
+
+ /**
+ * Hold values of terms, this class is immutable
+ */
+ static class Terms {
+ private final Map<String, Long> values;
+ // ZK node version
+ private final int version;
+
+ public Terms () {
+ this(new HashMap<>(), 0);
+ }
+
+ public Terms(Map<String, Long> values, int version) {
+ this.values = values;
+ this.version = version;
+ }
+
+ /**
+ * Can this replica become leader or is this replica's term equals to leader's term?
+ * @param coreNodeName of the replica
+ * @return true if this replica can become leader, false if otherwise
+ */
+ boolean canBecomeLeader(String coreNodeName) {
+ if (values.isEmpty()) return true;
+ long maxTerm = Collections.max(values.values());
+ return values.getOrDefault(coreNodeName, 0L) == maxTerm;
+ }
+
+ Long getTerm(String coreNodeName) {
+ return values.get(coreNodeName);
+ }
+
+ /**
+ * Return a new {@link Terms} in which term of {@code leader} is higher than {@code replicasNeedingRecovery}
+ * @param leader coreNodeName of leader
+ * @param replicasNeedingRecovery set of replicas in which their terms should be lower than leader's term
+ * @return null if term of {@code leader} is already higher than {@code replicasNeedingRecovery}
+ */
+ Terms increaseTerms(String leader, Set<String> replicasNeedingRecovery) {
+ if (!values.containsKey(leader)) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Can not find leader's term " + leader);
+ }
+
+ boolean changed = false;
+ boolean foundReplicasInLowerTerms = false;
+
+ HashMap<String, Long> newValues = new HashMap<>(values);
+ long leaderTerm = newValues.get(leader);
+ for (String replica : newValues.keySet()) {
+ if (replicasNeedingRecovery.contains(replica)) foundReplicasInLowerTerms = true;
+ if (Objects.equals(newValues.get(replica), leaderTerm)) {
+ if(replicasNeedingRecovery.contains(replica)) {
+ changed = true;
+ } else {
+ newValues.put(replica, leaderTerm+1);
+ }
+ }
+ }
+
+ // We should skip the optimization if there are no replicasNeedingRecovery present in local terms,
+ // this may indicate that the current value is stale
+ if (!changed && foundReplicasInLowerTerms) return null;
+ return new Terms(newValues, version);
+ }
+
+ /**
+ * Return a new {@link Terms} in which term of {@code coreNodeName} is removed
+ * @param coreNodeName of the replica
+ * @return null if term of {@code coreNodeName} is already not exist
+ */
+ Terms removeTerm(String coreNodeName) {
+ if (!values.containsKey(coreNodeName)) return null;
+
+ HashMap<String, Long> newValues = new HashMap<>(values);
+ newValues.remove(coreNodeName);
+ return new Terms(newValues, version);
+ }
+
+ /**
+ * Return a new {@link Terms} in which the associate term of {@code coreNodeName} is not null
+ * @param coreNodeName of the replica
+ * @return null if term of {@code coreNodeName} is already exist
+ */
+ Terms registerTerm(String coreNodeName) {
+ if (values.containsKey(coreNodeName)) return null;
+
+ HashMap<String, Long> newValues = new HashMap<>(values);
+ newValues.put(coreNodeName, 0L);
+ return new Terms(newValues, version);
+ }
+
+ /**
+ * Return a new {@link Terms} in which the term of {@code coreNodeName} is max
+ * @param coreNodeName of the replica
+ * @return null if term of {@code coreNodeName} is already maximum
+ */
+ Terms setEqualsToMax(String coreNodeName) {
+ long maxTerm;
+ try {
+ maxTerm = Collections.max(values.values());
+ } catch (NoSuchElementException e){
+ maxTerm = 0;
+ }
+ if (values.get(coreNodeName) == maxTerm) return null;
+
+ HashMap<String, Long> newValues = new HashMap<>(values);
+ newValues.put(coreNodeName, maxTerm);
+ return new Terms(newValues, version);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8c8d78a4/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
index 4c6ce47..428ad83 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
@@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.solr.client.solrj.cloud.autoscaling.AlreadyExistsException;
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
+import org.apache.solr.client.solrj.cloud.autoscaling.BadVersionException;
import org.apache.solr.client.solrj.cloud.autoscaling.DistribStateManager;
import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
@@ -392,7 +393,22 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
public static void createCollectionZkNode(DistribStateManager stateManager, String collection, Map<String,String> params) {
log.debug("Check for collection zkNode:" + collection);
String collectionPath = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection;
-
+ // clean up old terms node
+ String termsPath = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection + "/terms";
+ try {
+ if (stateManager.hasData(termsPath)) {
+ List<String> paths = stateManager.listData(termsPath);
+ for (String path : paths) {
+ stateManager.removeData(termsPath + "/" + path, -1);
+ }
+ stateManager.removeData(termsPath, -1);
+ }
+ } catch (InterruptedException e) {
+ Thread.interrupted();
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Error deleting old term nodes for collection from Zookeeper", e);
+ } catch (KeeperException | IOException | BadVersionException e) {
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Error deleting old term nodes for collection from Zookeeper", e);
+ }
try {
if (!stateManager.hasData(collectionPath)) {
log.debug("Creating collection in ZooKeeper:" + collection);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8c8d78a4/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
index cebb2d0..dcc3de6 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
@@ -28,8 +28,10 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
+import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import java.util.function.BiConsumer;
import com.google.common.collect.ImmutableSet;
@@ -47,6 +49,7 @@ import org.apache.solr.cloud.OverseerSolrResponse;
import org.apache.solr.cloud.OverseerTaskQueue;
import org.apache.solr.cloud.OverseerTaskQueue.QueueEvent;
import org.apache.solr.cloud.ZkController;
+import org.apache.solr.cloud.ZkShardTerms;
import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler;
import org.apache.solr.cloud.overseer.SliceMutator;
import org.apache.solr.cloud.rule.ReplicaAssigner;
@@ -1067,7 +1070,8 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
}
private static void forceLeaderElection(SolrQueryRequest req, CollectionsHandler handler) {
- ClusterState clusterState = handler.coreContainer.getZkController().getClusterState();
+ ZkController zkController = handler.coreContainer.getZkController();
+ ClusterState clusterState = zkController.getClusterState();
String collectionName = req.getParams().required().get(COLLECTION_PROP);
String sliceId = req.getParams().required().get(SHARD_ID_PROP);
@@ -1079,7 +1083,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
"No shard with name " + sliceId + " exists for collection " + collectionName);
}
- try {
+ try (ZkShardTerms zkShardTerms = new ZkShardTerms(collectionName, slice.getName(), zkController.getZkClient())) {
// if an active replica is the leader, then all is fine already
Replica leader = slice.getLeader();
if (leader != null && leader.getState() == State.ACTIVE) {
@@ -1096,20 +1100,37 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
handler.coreContainer.getZkController().getZkClient().clean(lirPath);
}
+ final Set<String> liveNodes = clusterState.getLiveNodes();
+ List<Replica> liveReplicas = slice.getReplicas().stream()
+ .filter(rep -> liveNodes.contains(rep.getNodeName())).collect(Collectors.toList());
+ boolean shouldIncreaseReplicaTerms = liveReplicas.stream()
+ .noneMatch(rep -> zkShardTerms.registered(rep.getName()) && zkShardTerms.canBecomeLeader(rep.getName()));
+ // we won't increase replica's terms if exist a live replica with term equals to leader
+ if (shouldIncreaseReplicaTerms) {
+ OptionalLong optionalMaxTerm = liveReplicas.stream()
+ .filter(rep -> zkShardTerms.registered(rep.getName()))
+ .mapToLong(rep -> zkShardTerms.getTerm(rep.getName()))
+ .max();
+ // increase terms of replicas less out-of-sync
+ if (optionalMaxTerm.isPresent()) {
+ liveReplicas.stream()
+ .filter(rep -> zkShardTerms.getTerm(rep.getName()) == optionalMaxTerm.getAsLong())
+ .forEach(rep -> zkShardTerms.setEqualsToMax(rep.getName()));
+ }
+ }
+
// Call all live replicas to prepare themselves for leadership, e.g. set last published
// state to active.
- for (Replica rep : slice.getReplicas()) {
- if (clusterState.getLiveNodes().contains(rep.getNodeName())) {
- ShardHandler shardHandler = handler.coreContainer.getShardHandlerFactory().getShardHandler();
+ for (Replica rep : liveReplicas) {
+ ShardHandler shardHandler = handler.coreContainer.getShardHandlerFactory().getShardHandler();
- ModifiableSolrParams params = new ModifiableSolrParams();
- params.set(CoreAdminParams.ACTION, CoreAdminAction.FORCEPREPAREFORLEADERSHIP.toString());
- params.set(CoreAdminParams.CORE, rep.getStr("core"));
- String nodeName = rep.getNodeName();
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.set(CoreAdminParams.ACTION, CoreAdminAction.FORCEPREPAREFORLEADERSHIP.toString());
+ params.set(CoreAdminParams.CORE, rep.getStr("core"));
+ String nodeName = rep.getNodeName();
- OverseerCollectionMessageHandler.sendShardRequest(nodeName, params, shardHandler, null, null,
- CommonParams.CORES_HANDLER_PATH, handler.coreContainer.getZkController().getZkStateReader()); // synchronous request
- }
+ OverseerCollectionMessageHandler.sendShardRequest(nodeName, params, shardHandler, null, null,
+ CommonParams.CORES_HANDLER_PATH, handler.coreContainer.getZkController().getZkStateReader()); // synchronous request
}
// Wait till we have an active leader
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8c8d78a4/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java b/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
index 0a6d5ce..3647735 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
@@ -21,6 +21,7 @@ import java.lang.invoke.MethodHandles;
import java.util.Objects;
import org.apache.solr.cloud.CloudDescriptor;
+import org.apache.solr.cloud.ZkShardTerms;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
@@ -124,6 +125,12 @@ class PrepRecoveryOp implements CoreAdminHandler.CoreAdminOp {
log.warn("Leader " + core.getName() + " ignoring request to be in the recovering state because it is live and active.");
}
+ ZkShardTerms shardTerms = coreContainer.getZkController().getShardTerms(collectionName, slice.getName());
+ // if the replica is waiting for leader to see recovery state, the leader should refresh its terms
+ if (waitForState == Replica.State.RECOVERING && shardTerms.registered(coreNodeName) && !shardTerms.canBecomeLeader(coreNodeName)) {
+ shardTerms.refreshTerms();
+ }
+
boolean onlyIfActiveCheckResult = onlyIfLeaderActive != null && onlyIfLeaderActive && localState != Replica.State.ACTIVE;
log.info("In WaitForState(" + waitForState + "): collection=" + collectionName + ", shard=" + slice.getName() +
", thisCore=" + core.getName() + ", leaderDoesNotNeedRecovery=" + leaderDoesNotNeedRecovery +
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8c8d78a4/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java b/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java
index b418a19..739604f 100644
--- a/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java
+++ b/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java
@@ -308,19 +308,20 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
// after the current one, and if there is, bail
boolean locked = recoveryLock.tryLock();
try {
- if (!locked) {
- if (recoveryWaiting.get() > 0) {
- return;
- }
- recoveryWaiting.incrementAndGet();
- } else {
- recoveryWaiting.incrementAndGet();
- cancelRecovery();
+ if (!locked && recoveryWaiting.get() > 0) {
+ return;
}
+
+ recoveryWaiting.incrementAndGet();
+ cancelRecovery();
recoveryLock.lock();
try {
- recoveryWaiting.decrementAndGet();
+ // don't use recoveryLock.getQueueLength() for this
+ if (recoveryWaiting.decrementAndGet() > 0) {
+ // another recovery waiting behind us, let it run now instead of after we finish
+ return;
+ }
// to be air tight we must also check after lock
if (cc.isShutDown()) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8c8d78a4/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
index de031a2..3cff171 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -44,6 +45,7 @@ import org.apache.solr.client.solrj.response.SimpleSolrResponse;
import org.apache.solr.cloud.CloudDescriptor;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.ZkController;
+import org.apache.solr.cloud.ZkShardTerms;
import org.apache.solr.cloud.overseer.OverseerAction;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
@@ -184,6 +186,10 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
private final boolean cloneRequiredOnLeader;
private final Replica.Type replicaType;
+ @Deprecated
+ // this flag, used for testing rolling updates, should be removed by SOLR-11812
+ private final boolean isOldLIRMode;
+
public DistributedUpdateProcessor(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) {
this(req, rsp, new AtomicUpdateDocumentMerger(req), next);
}
@@ -202,6 +208,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
this.ulog = req.getCore().getUpdateHandler().getUpdateLog();
this.vinfo = ulog == null ? null : ulog.getVersionInfo();
+ this.isOldLIRMode = !"new".equals(req.getCore().getCoreDescriptor().getCoreProperty("lirVersion", "new"));
versionsStored = this.vinfo != null && this.vinfo.getVersionField() != null;
returnVersions = req.getParams().getBool(UpdateParams.VERSIONS ,false);
@@ -343,13 +350,13 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
}
List<Node> nodes = new ArrayList<>(replicaProps.size());
+ ZkShardTerms zkShardTerms = zkController.getShardTerms(collection, shardId);
for (ZkCoreNodeProps props : replicaProps) {
- if (skipList != null) {
- boolean skip = skipListSet.contains(props.getCoreUrl());
- log.info("check url:" + props.getCoreUrl() + " against:" + skipListSet + " result:" + skip);
- if (!skip) {
- nodes.add(new StdNode(props, collection, shardId));
- }
+ String coreNodeName = ((Replica) props.getNodeProps()).getName();
+ if (skipList != null && skipListSet.contains(props.getCoreUrl())) {
+ log.info("check url:" + props.getCoreUrl() + " against:" + skipListSet + " result:true");
+ } else if(!isOldLIRMode && zkShardTerms.registered(coreNodeName) && !zkShardTerms.canBecomeLeader(coreNodeName)) {
+ log.info("skip url:{} cause its term is less than leader", props.getCoreUrl());
} else {
nodes.add(new StdNode(props, collection, shardId));
}
@@ -751,7 +758,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
// TODO - we may need to tell about more than one error...
List<Error> errorsForClient = new ArrayList<>(errors.size());
-
+ Map<ShardInfo, Set<String>> failedReplicas = new HashMap<>();
for (final SolrCmdDistributor.Error error : errors) {
if (error.req.node instanceof RetryNode) {
@@ -843,18 +850,27 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
&& foundErrorNodeInReplicaList // we found an error for one of replicas
&& !stdNode.getNodeProps().getCoreUrl().equals(leaderProps.getCoreUrl())) { // we do not want to put ourself into LIR
try {
+ String coreNodeName = ((Replica) stdNode.getNodeProps().getNodeProps()).getName();
// if false, then the node is probably not "live" anymore
// and we do not need to send a recovery message
Throwable rootCause = SolrException.getRootCause(error.e);
- log.error("Setting up to try to start recovery on replica {}", replicaUrl, rootCause);
- zkController.ensureReplicaInLeaderInitiatedRecovery(
- req.getCore().getCoreContainer(),
- collection,
- shardId,
- stdNode.getNodeProps(),
- req.getCore().getCoreDescriptor(),
- false /* forcePublishState */
- );
+ if (!isOldLIRMode && zkController.getShardTerms(collection, shardId).registered(coreNodeName)) {
+ log.error("Setting up to try to start recovery on replica {} with url {} by increasing leader term", coreNodeName, replicaUrl, rootCause);
+ ShardInfo shardInfo = new ShardInfo(collection, shardId, leaderCoreNodeName);
+ failedReplicas.putIfAbsent(shardInfo, new HashSet<>());
+ failedReplicas.get(shardInfo).add(coreNodeName);
+ } else {
+ // The replica did not registered its term, so it must run with old LIR implementation
+ log.error("Setting up to try to start recovery on replica {}", replicaUrl, rootCause);
+ zkController.ensureReplicaInLeaderInitiatedRecovery(
+ req.getCore().getCoreContainer(),
+ collection,
+ shardId,
+ stdNode.getNodeProps(),
+ req.getCore().getCoreDescriptor(),
+ false /* forcePublishState */
+ );
+ }
} catch (Exception exc) {
Throwable setLirZnodeFailedCause = SolrException.getRootCause(exc);
log.error("Leader failed to set replica " +
@@ -873,6 +889,12 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
}
}
}
+ if (!isOldLIRMode) {
+ for (Map.Entry<ShardInfo, Set<String>> entry : failedReplicas.entrySet()) {
+ ShardInfo shardInfo = entry.getKey();
+ zkController.getShardTerms(shardInfo.collection, shardInfo.shard).ensureTermsIsHigher(shardInfo.leader, entry.getValue());
+ }
+ }
// in either case, we need to attach the achieved and min rf to the response.
if (leaderReplicationTracker != null || rollupReplicationTracker != null) {
int achievedRf = Integer.MAX_VALUE;
@@ -905,6 +927,38 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
}
}
+ private class ShardInfo {
+ private String collection;
+ private String shard;
+ private String leader;
+
+ public ShardInfo(String collection, String shard, String leader) {
+ this.collection = collection;
+ this.shard = shard;
+ this.leader = leader;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ ShardInfo shardInfo = (ShardInfo) o;
+
+ if (!collection.equals(shardInfo.collection)) return false;
+ if (!shard.equals(shardInfo.shard)) return false;
+ return leader.equals(shardInfo.leader);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = collection.hashCode();
+ result = 31 * result + shard.hashCode();
+ result = 31 * result + leader.hashCode();
+ return result;
+ }
+ }
+
// must be synchronized by bucket
private void doLocalAdd(AddUpdateCommand cmd) throws IOException {