You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by da...@apache.org on 2018/01/29 08:56:28 UTC
[1/2] lucene-solr:master: SOLR-11702: Redesign current LIR
implementation
Repository: lucene-solr
Updated Branches:
refs/heads/master 00d453d27 -> 27ef65306
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/27ef6530/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java b/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java
index 89ff67a..46ecdb6 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java
@@ -21,6 +21,7 @@ import java.lang.invoke.MethodHandles;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Properties;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrRequest;
@@ -70,18 +71,166 @@ public class ForceLeaderTest extends HttpPartitionTest {
}
+ /**
+ * Tests that FORCELEADER can get an active leader even only replicas with term lower than leader's term are live
+ */
+ @Test
+ @Slow
+ public void testReplicasInLowerTerms() throws Exception {
+ handle.put("maxScore", SKIPVAL);
+ handle.put("timestamp", SKIPVAL);
+
+ String testCollectionName = "forceleader_lower_terms_collection";
+ createCollection(testCollectionName, "conf1", 1, 3, 1);
+ cloudClient.setDefaultCollection(testCollectionName);
+
+ try {
+ List<Replica> notLeaders = ensureAllReplicasAreActive(testCollectionName, SHARD1, 1, 3, maxWaitSecsToSeeAllActive);
+ assertEquals("Expected 2 replicas for collection " + testCollectionName
+ + " but found " + notLeaders.size() + "; clusterState: "
+ + printClusterStateInfo(testCollectionName), 2, notLeaders.size());
+
+ Replica leader = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, SHARD1);
+ JettySolrRunner notLeader0 = getJettyOnPort(getReplicaPort(notLeaders.get(0)));
+ ZkController zkController = notLeader0.getCoreContainer().getZkController();
+
+ log.info("Before put non leaders into lower term: " + printClusterStateInfo());
+ putNonLeadersIntoLowerTerm(testCollectionName, SHARD1, zkController, leader, notLeaders);
+
+ for (Replica replica : notLeaders) {
+ waitForState(testCollectionName, replica.getName(), State.DOWN, 60000);
+ }
+ waitForState(testCollectionName, leader.getName(), State.DOWN, 60000);
+ cloudClient.getZkStateReader().forceUpdateCollection(testCollectionName);
+ ClusterState clusterState = cloudClient.getZkStateReader().getClusterState();
+ int numActiveReplicas = getNumberOfActiveReplicas(clusterState, testCollectionName, SHARD1);
+ assertEquals("Expected only 0 active replica but found " + numActiveReplicas +
+ "; clusterState: " + printClusterStateInfo(), 0, numActiveReplicas);
+
+ int numReplicasOnLiveNodes = 0;
+ for (Replica rep : clusterState.getCollection(testCollectionName).getSlice(SHARD1).getReplicas()) {
+ if (clusterState.getLiveNodes().contains(rep.getNodeName())) {
+ numReplicasOnLiveNodes++;
+ }
+ }
+ assertEquals(2, numReplicasOnLiveNodes);
+ log.info("Before forcing leader: " + printClusterStateInfo());
+ // Assert there is no leader yet
+ assertNull("Expected no leader right now. State: " + clusterState.getCollection(testCollectionName).getSlice(SHARD1),
+ clusterState.getCollection(testCollectionName).getSlice(SHARD1).getLeader());
+
+ assertSendDocFails(3);
+
+ log.info("Do force leader...");
+ doForceLeader(cloudClient, testCollectionName, SHARD1);
+
+ // By now we have an active leader. Wait for recoveries to begin
+ waitForRecoveriesToFinish(testCollectionName, cloudClient.getZkStateReader(), true);
+
+ cloudClient.getZkStateReader().forceUpdateCollection(testCollectionName);
+ clusterState = cloudClient.getZkStateReader().getClusterState();
+ log.info("After forcing leader: " + clusterState.getCollection(testCollectionName).getSlice(SHARD1));
+ // we have a leader
+ Replica newLeader = clusterState.getCollectionOrNull(testCollectionName).getSlice(SHARD1).getLeader();
+ assertNotNull(newLeader);
+ // leader is active
+ assertEquals(State.ACTIVE, newLeader.getState());
+
+ numActiveReplicas = getNumberOfActiveReplicas(clusterState, testCollectionName, SHARD1);
+ assertEquals(2, numActiveReplicas);
+
+ // Assert that indexing works again
+ log.info("Sending doc 4...");
+ sendDoc(4);
+ log.info("Committing...");
+ cloudClient.commit();
+ log.info("Doc 4 sent and commit issued");
+
+ assertDocsExistInAllReplicas(notLeaders, testCollectionName, 1, 1);
+ assertDocsExistInAllReplicas(notLeaders, testCollectionName, 4, 4);
+
+ // Docs 1 and 4 should be here. 2 was lost during the partition, 3 had failed to be indexed.
+ log.info("Checking doc counts...");
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.add("q", "*:*");
+ assertEquals("Expected only 2 documents in the index", 2, cloudClient.query(params).getResults().getNumFound());
+
+ bringBackOldLeaderAndSendDoc(testCollectionName, leader, notLeaders, 5);
+ } finally {
+ log.info("Cleaning up after the test.");
+ // try to clean up
+ attemptCollectionDelete(cloudClient, testCollectionName);
+ }
+ }
+
+ void putNonLeadersIntoLowerTerm(String collectionName, String shard, ZkController zkController, Replica leader, List<Replica> notLeaders) throws Exception {
+ SocketProxy[] nonLeaderProxies = new SocketProxy[notLeaders.size()];
+ for (int i = 0; i < notLeaders.size(); i++)
+ nonLeaderProxies[i] = getProxyForReplica(notLeaders.get(i));
+
+ sendDoc(1);
+
+ // ok, now introduce a network partition between the leader and both replicas
+ log.info("Closing proxies for the non-leader replicas...");
+ for (SocketProxy proxy : nonLeaderProxies)
+ proxy.close();
+ getProxyForReplica(leader).close();
+
+ // indexing during a partition
+ log.info("Sending a doc during the network partition...");
+ JettySolrRunner leaderJetty = getJettyOnPort(getReplicaPort(leader));
+ sendDoc(2, null, leaderJetty);
+
+ for (Replica replica : notLeaders) {
+ waitForState(collectionName, replica.getName(), State.DOWN, 60000);
+ }
+
+ // Kill the leader
+ log.info("Killing leader for shard1 of " + collectionName + " on node " + leader.getNodeName() + "");
+ leaderJetty.stop();
+
+ // Wait for a steady state, till the shard is leaderless
+ log.info("Sleep and periodically wake up to check for state...");
+ for (int i = 0; i < 20; i++) {
+ ClusterState clusterState = zkController.getZkStateReader().getClusterState();
+ boolean allDown = true;
+ for (Replica replica : clusterState.getCollection(collectionName).getSlice(shard).getReplicas()) {
+ if (replica.getState() != State.DOWN) {
+ allDown = false;
+ }
+ }
+ if (allDown && clusterState.getCollection(collectionName).getSlice(shard).getLeader() == null) {
+ break;
+ }
+ Thread.sleep(1000);
+ }
+ log.info("Waking up...");
+
+ // remove the network partition
+ log.info("Reopening the proxies for the non-leader replicas...");
+ for (SocketProxy proxy : nonLeaderProxies)
+ proxy.reopen();
+
+ try (ZkShardTerms zkShardTerms = new ZkShardTerms(collectionName, shard, cloudClient.getZkStateReader().getZkClient())) {
+ for (Replica notLeader : notLeaders) {
+ assertTrue(zkShardTerms.getTerm(leader.getName()) > zkShardTerms.getTerm(notLeader.getName()));
+ }
+ }
+ }
+
/***
* Tests that FORCELEADER can get an active leader after leader puts all replicas in LIR and itself goes down,
* hence resulting in a leaderless shard.
*/
@Test
@Slow
+ //TODO remove in SOLR-11812
public void testReplicasInLIRNoLeader() throws Exception {
handle.put("maxScore", SKIPVAL);
handle.put("timestamp", SKIPVAL);
String testCollectionName = "forceleader_test_collection";
- createCollection(testCollectionName, "conf1", 1, 3, 1);
+ createOldLirCollection(testCollectionName, 3);
cloudClient.setDefaultCollection(testCollectionName);
try {
@@ -157,6 +306,28 @@ public class ForceLeaderTest extends HttpPartitionTest {
}
}
+ private void createOldLirCollection(String collection, int numReplicas) throws IOException, SolrServerException {
+ if (onlyLeaderIndexes) {
+ CollectionAdminRequest
+ .createCollection(collection, "conf1", 1, 0, numReplicas, 0)
+ .setCreateNodeSet("")
+ .process(cloudClient);
+ } else {
+ CollectionAdminRequest.createCollection(collection, "conf1", 1, numReplicas)
+ .setCreateNodeSet("")
+ .process(cloudClient);
+ }
+ Properties oldLir = new Properties();
+ oldLir.setProperty("lirVersion", "old");
+ for (int i = 0; i < numReplicas; i++) {
+ // this is the only way to create replicas which run in old lir implementation
+ CollectionAdminRequest
+ .addReplicaToShard(collection, "shard1", onlyLeaderIndexes? Replica.Type.TLOG: Replica.Type.NRT)
+ .setProperties(oldLir)
+ .process(cloudClient);
+ }
+ }
+
/**
* Test that FORCELEADER can set last published state of all down (live) replicas to active (so
* that they become worthy candidates for leader election).
@@ -167,7 +338,7 @@ public class ForceLeaderTest extends HttpPartitionTest {
handle.put("timestamp", SKIPVAL);
String testCollectionName = "forceleader_last_published";
- createCollection(testCollectionName, "conf1", 1, 3, 1);
+ createOldLirCollection(testCollectionName, 3);
cloudClient.setDefaultCollection(testCollectionName);
log.info("Collection created: " + testCollectionName);
@@ -204,33 +375,6 @@ public class ForceLeaderTest extends HttpPartitionTest {
}
}
- protected void unsetLeader(String collection, String slice) throws Exception {
- ZkDistributedQueue inQueue = Overseer.getStateUpdateQueue(cloudClient.getZkStateReader().getZkClient());
- ZkStateReader zkStateReader = cloudClient.getZkStateReader();
-
- ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.LEADER.toLower(),
- ZkStateReader.SHARD_ID_PROP, slice,
- ZkStateReader.COLLECTION_PROP, collection);
- inQueue.offer(Utils.toJSON(m));
-
- ClusterState clusterState = null;
- boolean transition = false;
- for (int counter = 10; counter > 0; counter--) {
- clusterState = zkStateReader.getClusterState();
- Replica newLeader = clusterState.getCollection(collection).getSlice(slice).getLeader();
- if (newLeader == null) {
- transition = true;
- break;
- }
- Thread.sleep(1000);
- }
-
- if (!transition) {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Could not unset replica leader" +
- ". Cluster state: " + printClusterStateInfo(collection));
- }
- }
-
protected void setReplicaState(String collection, String slice, Replica replica, Replica.State state) throws Exception {
DistributedQueue inQueue = Overseer.getStateUpdateQueue(cloudClient.getZkStateReader().getZkClient());
ZkStateReader zkStateReader = cloudClient.getZkStateReader();
@@ -263,23 +407,6 @@ public class ForceLeaderTest extends HttpPartitionTest {
". Last known state of the replica: " + replicaState);
}
}
-
- /*protected void setLastPublishedState(String collection, String slice, Replica replica, Replica.State state) throws SolrServerException, IOException,
- KeeperException, InterruptedException {
- ZkStateReader zkStateReader = cloudClient.getZkStateReader();
- String baseUrl = zkStateReader.getBaseUrlForNodeName(replica.getNodeName());
-
- ModifiableSolrParams params = new ModifiableSolrParams();
- params.set(CoreAdminParams.ACTION, CoreAdminAction.FORCEPREPAREFORLEADERSHIP.toString());
- params.set(CoreAdminParams.CORE, replica.getStr("core"));
- params.set(ZkStateReader.STATE_PROP, state.toString());
-
- SolrRequest<SimpleSolrResponse> req = new GenericSolrRequest(METHOD.GET, "/admin/cores", params);
- NamedList resp = null;
- try (HttpSolrClient hsc = new HttpSolrClient(baseUrl)) {
- resp = hsc.request(req);
- }
- }*/
protected Replica.State getLastPublishedState(String collection, String slice, Replica replica) throws SolrServerException, IOException,
KeeperException, InterruptedException {
@@ -377,6 +504,7 @@ public class ForceLeaderTest extends HttpPartitionTest {
// Bring back the leader which was stopped
log.info("Bringing back originally killed leader...");
JettySolrRunner leaderJetty = getJettyOnPort(getReplicaPort(leader));
+ getProxyForReplica(leader).reopen();
leaderJetty.start();
waitForRecoveriesToFinish(collection, cloudClient.getZkStateReader(), true);
cloudClient.getZkStateReader().forceUpdateCollection(collection);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/27ef6530/solr/core/src/test/org/apache/solr/cloud/HttpPartitionOnCommitTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/HttpPartitionOnCommitTest.java b/solr/core/src/test/org/apache/solr/cloud/HttpPartitionOnCommitTest.java
new file mode 100644
index 0000000..1580661
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/HttpPartitionOnCommitTest.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud;
+
+import org.apache.http.NoHttpResponseException;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.util.RTimer;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.lang.invoke.MethodHandles;
+import java.util.List;
+
+public class HttpPartitionOnCommitTest extends BasicDistributedZkTest {
+
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private static final long sleepMsBeforeHealPartition = 2000L;
+
+ private final boolean onlyLeaderIndexes = random().nextBoolean();
+
+ public HttpPartitionOnCommitTest() {
+ super();
+ sliceCount = 1;
+ fixShardCount(4);
+ }
+
+ @Override
+ protected boolean useTlogReplicas() {
+ return onlyLeaderIndexes;
+ }
+
+ @Override
+ @Test
+ public void test() throws Exception {
+ oneShardTest();
+ multiShardTest();
+ }
+
+ private void multiShardTest() throws Exception {
+
+ log.info("Running multiShardTest");
+
+ // create a collection that has 2 shard and 2 replicas
+ String testCollectionName = "c8n_2x2_commits";
+ createCollection(testCollectionName, "conf1", 2, 2, 1);
+ cloudClient.setDefaultCollection(testCollectionName);
+
+ List<Replica> notLeaders =
+ ensureAllReplicasAreActive(testCollectionName, "shard1", 2, 2, 30);
+ assertTrue("Expected 1 replicas for collection " + testCollectionName
+ + " but found " + notLeaders.size() + "; clusterState: "
+ + printClusterStateInfo(),
+ notLeaders.size() == 1);
+
+ log.info("All replicas active for "+testCollectionName);
+
+ // let's put the leader in its own partition, no replicas can contact it now
+ Replica leader = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1");
+ log.info("Creating partition to leader at "+leader.getCoreUrl());
+ SocketProxy leaderProxy = getProxyForReplica(leader);
+ leaderProxy.close();
+
+ // let's find the leader of shard2 and ask him to commit
+ Replica shard2Leader = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard2");
+ sendCommitWithRetry(shard2Leader);
+
+ Thread.sleep(sleepMsBeforeHealPartition);
+
+ cloudClient.getZkStateReader().forceUpdateCollection(testCollectionName); // get the latest state
+ leader = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1");
+ assertSame("Leader was not active", Replica.State.ACTIVE, leader.getState());
+
+ log.info("Healing partitioned replica at "+leader.getCoreUrl());
+ leaderProxy.reopen();
+ Thread.sleep(sleepMsBeforeHealPartition);
+
+ // try to clean up
+ attemptCollectionDelete(cloudClient, testCollectionName);
+
+ log.info("multiShardTest completed OK");
+ }
+
+ private void oneShardTest() throws Exception {
+ log.info("Running oneShardTest");
+
+ // create a collection that has 1 shard and 3 replicas
+ String testCollectionName = "c8n_1x3_commits";
+ createCollection(testCollectionName, "conf1", 1, 3, 1);
+ cloudClient.setDefaultCollection(testCollectionName);
+
+ List<Replica> notLeaders =
+ ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 3, 30);
+ assertTrue("Expected 2 replicas for collection " + testCollectionName
+ + " but found " + notLeaders.size() + "; clusterState: "
+ + printClusterStateInfo(),
+ notLeaders.size() == 2);
+
+ log.info("All replicas active for "+testCollectionName);
+
+ // let's put the leader in its own partition, no replicas can contact it now
+ Replica leader = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1");
+ log.info("Creating partition to leader at "+leader.getCoreUrl());
+ SocketProxy leaderProxy = getProxyForReplica(leader);
+ leaderProxy.close();
+
+ Replica replica = notLeaders.get(0);
+ sendCommitWithRetry(replica);
+ Thread.sleep(sleepMsBeforeHealPartition);
+
+ cloudClient.getZkStateReader().forceUpdateCollection(testCollectionName); // get the latest state
+ leader = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1");
+ assertSame("Leader was not active", Replica.State.ACTIVE, leader.getState());
+
+ log.info("Healing partitioned replica at "+leader.getCoreUrl());
+ leaderProxy.reopen();
+ Thread.sleep(sleepMsBeforeHealPartition);
+
+ // try to clean up
+ attemptCollectionDelete(cloudClient, testCollectionName);
+
+ log.info("oneShardTest completed OK");
+ }
+
+ /**
+ * Overrides the parent implementation to install a SocketProxy in-front of the Jetty server.
+ */
+ @Override
+ public JettySolrRunner createJetty(File solrHome, String dataDir,
+ String shardList, String solrConfigOverride, String schemaOverride, Replica.Type replicaType)
+ throws Exception {
+ return createProxiedJetty(solrHome, dataDir, shardList, solrConfigOverride, schemaOverride, replicaType);
+ }
+
+ protected void sendCommitWithRetry(Replica replica) throws Exception {
+ String replicaCoreUrl = replica.getCoreUrl();
+ log.info("Sending commit request to: "+replicaCoreUrl);
+ final RTimer timer = new RTimer();
+ try (HttpSolrClient client = getHttpSolrClient(replicaCoreUrl)) {
+ try {
+ client.commit();
+
+ log.info("Sent commit request to {} OK, took {}ms", replicaCoreUrl, timer.getTime());
+ } catch (Exception exc) {
+ Throwable rootCause = SolrException.getRootCause(exc);
+ if (rootCause instanceof NoHttpResponseException) {
+ log.warn("No HTTP response from sending commit request to "+replicaCoreUrl+
+ "; will re-try after waiting 3 seconds");
+ Thread.sleep(3000);
+ client.commit();
+ log.info("Second attempt at sending commit to "+replicaCoreUrl+" succeeded.");
+ } else {
+ throw exc;
+ }
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/27ef6530/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java b/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java
index 58ebbf9..a18aa31 100644
--- a/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java
@@ -33,6 +33,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.JSONTestUtil;
import org.apache.solr.SolrTestCaseJ4.SuppressSSL;
+import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
@@ -49,15 +50,21 @@ import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.TimeSource;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.SolrCore;
import org.apache.solr.update.UpdateLog;
import org.apache.solr.util.MockCoreContainer.MockCoreDescriptor;
import org.apache.solr.util.RTimer;
+import org.apache.solr.util.TimeOut;
+import org.apache.zookeeper.KeeperException;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.solr.common.cloud.Replica.State.DOWN;
+import static org.apache.solr.common.cloud.Replica.State.RECOVERING;
+
/**
* Simulates HTTP partitions between a leader and replica but the replica does
* not lose its ZooKeeper connection.
@@ -121,6 +128,8 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
testLeaderInitiatedRecoveryCRUD();
+ testDoRecoveryOnRestart();
+
// Tests that if we set a minRf that's not satisfied, no recovery is requested, but if minRf is satisfied,
// recovery is requested
testMinRf();
@@ -147,8 +156,9 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
}
/**
- * Tests handling of lir state znodes.
+ * Tests handling of different format of lir nodes
*/
+ //TODO remove in SOLR-11812
protected void testLeaderInitiatedRecoveryCRUD() throws Exception {
String testCollectionName = "c8n_crud_1x2";
String shardId = "shard1";
@@ -182,11 +192,11 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
};
}
};
-
- zkController.updateLeaderInitiatedRecoveryState(testCollectionName, shardId, notLeader.getName(), Replica.State.DOWN, cd, true);
+
+ zkController.updateLeaderInitiatedRecoveryState(testCollectionName, shardId, notLeader.getName(), DOWN, cd, true);
Map<String,Object> lirStateMap = zkController.getLeaderInitiatedRecoveryStateObject(testCollectionName, shardId, notLeader.getName());
assertNotNull(lirStateMap);
- assertSame(Replica.State.DOWN, Replica.State.getState((String) lirStateMap.get(ZkStateReader.STATE_PROP)));
+ assertSame(DOWN, Replica.State.getState((String) lirStateMap.get(ZkStateReader.STATE_PROP)));
// test old non-json format handling
SolrZkClient zkClient = zkController.getZkClient();
@@ -194,13 +204,64 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
zkClient.setData(znodePath, "down".getBytes(StandardCharsets.UTF_8), true);
lirStateMap = zkController.getLeaderInitiatedRecoveryStateObject(testCollectionName, shardId, notLeader.getName());
assertNotNull(lirStateMap);
- assertSame(Replica.State.DOWN, Replica.State.getState((String) lirStateMap.get(ZkStateReader.STATE_PROP)));
+ assertSame(DOWN, Replica.State.getState((String) lirStateMap.get(ZkStateReader.STATE_PROP)));
zkClient.delete(znodePath, -1, false);
// try to clean up
attemptCollectionDelete(cloudClient, testCollectionName);
}
+ private void testDoRecoveryOnRestart() throws Exception {
+ String testCollectionName = "collDoRecoveryOnRestart";
+ try {
+ // Inject pausing in recovery op, hence the replica won't be able to finish recovery
+ System.setProperty("solr.cloud.wait-for-updates-with-stale-state-pause", String.valueOf(Integer.MAX_VALUE));
+
+ createCollection(testCollectionName, "conf1", 1, 2, 1);
+ cloudClient.setDefaultCollection(testCollectionName);
+
+ sendDoc(1, 2);
+
+ JettySolrRunner leaderJetty = getJettyOnPort(getReplicaPort(getShardLeader(testCollectionName, "shard1", 1000)));
+ List<Replica> notLeaders =
+ ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 2, maxWaitSecsToSeeAllActive);
+ assertDocsExistInAllReplicas(notLeaders, testCollectionName, 1, 1);
+
+ SocketProxy proxy0 = getProxyForReplica(notLeaders.get(0));
+ SocketProxy leaderProxy = getProxyForReplica(getShardLeader(testCollectionName, "shard1", 1000));
+
+ proxy0.close();
+ leaderProxy.close();
+
+ // indexing during a partition
+ int achievedRf = sendDoc(2, 1, leaderJetty);
+ assertEquals("Unexpected achieved replication factor", 1, achievedRf);
+ try (ZkShardTerms zkShardTerms = new ZkShardTerms(testCollectionName, "shard1", cloudClient.getZkStateReader().getZkClient())) {
+ assertFalse(zkShardTerms.canBecomeLeader(notLeaders.get(0).getName()));
+ }
+ waitForState(testCollectionName, notLeaders.get(0).getName(), DOWN, 10000);
+
+ // heal partition
+ proxy0.reopen();
+ leaderProxy.reopen();
+
+ waitForState(testCollectionName, notLeaders.get(0).getName(), RECOVERING, 10000);
+
+ System.clearProperty("solr.cloud.wait-for-updates-with-stale-state-pause");
+ JettySolrRunner notLeaderJetty = getJettyOnPort(getReplicaPort(notLeaders.get(0)));
+ ChaosMonkey.stop(notLeaderJetty);
+
+ ChaosMonkey.start(notLeaderJetty);
+ ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 2, 100);
+ assertDocsExistInAllReplicas(notLeaders, testCollectionName, 1, 2);
+ } finally {
+ System.clearProperty("solr.cloud.wait-for-updates-with-stale-state-pause");
+ }
+
+ // try to clean up
+ attemptCollectionDelete(cloudClient, testCollectionName);
+ }
+
protected void testMinRf() throws Exception {
// create a collection that has 1 shard and 3 replicas
String testCollectionName = "collMinRf_1x3";
@@ -209,6 +270,7 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
sendDoc(1, 2);
+ JettySolrRunner leaderJetty = getJettyOnPort(getReplicaPort(getShardLeader(testCollectionName, "shard1", 1000)));
List<Replica> notLeaders =
ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 3, maxWaitSecsToSeeAllActive);
assertTrue("Expected 2 non-leader replicas for collection " + testCollectionName
@@ -221,27 +283,21 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
// Now introduce a network partition between the leader and 1 replica, so a minRf of 2 is still achieved
log.info("partitioning replica : " + notLeaders.get(0));
SocketProxy proxy0 = getProxyForReplica(notLeaders.get(0));
+ SocketProxy leaderProxy = getProxyForReplica(getShardLeader(testCollectionName, "shard1", 1000));
proxy0.close();
+ // leader still can connect to replica 2, by closing leaderProxy, replica 1 can not do recovery
+ leaderProxy.close();
// indexing during a partition
- int achievedRf = sendDoc(2, 2);
+ int achievedRf = sendDoc(2, 2, leaderJetty);
assertEquals("Unexpected achieved replication factor", 2, achievedRf);
-
+ try (ZkShardTerms zkShardTerms = new ZkShardTerms(testCollectionName, "shard1", cloudClient.getZkStateReader().getZkClient())) {
+ assertFalse(zkShardTerms.canBecomeLeader(notLeaders.get(0).getName()));
+ }
Thread.sleep(sleepMsBeforeHealPartition);
-
- // Verify that the partitioned replica is DOWN
- ZkStateReader zkr = cloudClient.getZkStateReader();
- zkr.forceUpdateCollection(testCollectionName);; // force the state to be fresh
- ClusterState cs = zkr.getClusterState();
- Collection<Slice> slices = cs.getCollection(testCollectionName).getActiveSlices();
- Slice slice = slices.iterator().next();
- Replica partitionedReplica = slice.getReplica(notLeaders.get(0).getName());
- assertEquals("The partitioned replica did not get marked down",
- Replica.State.DOWN.toString(), partitionedReplica.getStr(ZkStateReader.STATE_PROP));
- log.info("un-partitioning replica : " + notLeaders.get(0));
-
proxy0.reopen();
+ leaderProxy.reopen();
notLeaders =
ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 3, maxWaitSecsToSeeAllActive);
@@ -254,8 +310,10 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
proxy0.close();
SocketProxy proxy1 = getProxyForReplica(notLeaders.get(1));
proxy1.close();
+ leaderProxy = getProxyForReplica(getShardLeader(testCollectionName, "shard1", 1000));
+ leaderProxy.close();
- achievedRf = sendDoc(3, 2);
+ achievedRf = sendDoc(3, 2, leaderJetty);
assertEquals("Unexpected achieved replication factor", 1, achievedRf);
Thread.sleep(sleepMsBeforeHealPartition);
@@ -265,6 +323,7 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
proxy0.reopen();
proxy1.reopen();
+ leaderProxy.reopen();
notLeaders =
ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 3, maxWaitSecsToSeeAllActive);
@@ -299,30 +358,32 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
Replica notLeader =
ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 2, maxWaitSecsToSeeAllActive).get(0);
-
+ JettySolrRunner leaderJetty = getJettyOnPort(getReplicaPort(getShardLeader(testCollectionName, "shard1", 1000)));
+
// ok, now introduce a network partition between the leader and the replica
SocketProxy proxy = getProxyForReplica(notLeader);
+ SocketProxy leaderProxy = getProxyForReplica(getShardLeader(testCollectionName, "shard1", 1000));
proxy.close();
-
+ leaderProxy.close();
+
// indexing during a partition
- sendDoc(2);
-
- // Have the partition last at least 1 sec
- // While this gives the impression that recovery is timing related, this is
- // really only
- // to give time for the state to be written to ZK before the test completes.
- // In other words,
- // without a brief pause, the test finishes so quickly that it doesn't give
- // time for the recovery process to kick-in
- Thread.sleep(sleepMsBeforeHealPartition);
+ sendDoc(2, null, leaderJetty);
+ // replica should publish itself as DOWN if the network is not healed after some amount time
+ waitForState(testCollectionName, notLeader.getName(), DOWN, 10000);
proxy.reopen();
+ leaderProxy.reopen();
List<Replica> notLeaders =
ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 2, maxWaitSecsToSeeAllActive);
- sendDoc(3);
+ int achievedRf = sendDoc(3);
+ if (achievedRf == 1) {
+ // this case can happen when leader reuse an connection get established before network partition
+ // TODO: Remove when SOLR-11776 get committed
+ ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 2, maxWaitSecsToSeeAllActive);
+ }
// sent 3 docs in so far, verify they are on the leader and replica
assertDocsExistInAllReplicas(notLeaders, testCollectionName, 1, 3);
@@ -349,21 +410,25 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
if (d % 10 == 0) {
if (hasPartition) {
proxy.reopen();
+ leaderProxy.reopen();
hasPartition = false;
} else {
if (d >= 10) {
proxy.close();
+ leaderProxy.close();
hasPartition = true;
Thread.sleep(sleepMsBeforeHealPartition);
}
}
}
- sendDoc(d + 4); // 4 is offset as we've already indexed 1-3
+ // always send doc directly to leader without going through proxy
+ sendDoc(d + 4, null, leaderJetty); // 4 is offset as we've already indexed 1-3
}
// restore connectivity if lost
if (hasPartition) {
proxy.reopen();
+ leaderProxy.reopen();
}
notLeaders = ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 2, maxWaitSecsToSeeAllActive);
@@ -384,7 +449,24 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
// try to clean up
attemptCollectionDelete(cloudClient, testCollectionName);
}
-
+
+ protected void waitForState(String collection, String replicaName, Replica.State state, long ms) throws KeeperException, InterruptedException {
+ TimeOut timeOut = new TimeOut(ms, TimeUnit.MILLISECONDS, TimeSource.CURRENT_TIME);
+ Replica.State replicaState = Replica.State.ACTIVE;
+ while (!timeOut.hasTimedOut()) {
+ ZkStateReader zkr = cloudClient.getZkStateReader();
+ zkr.forceUpdateCollection(collection);; // force the state to be fresh
+ ClusterState cs = zkr.getClusterState();
+ Collection<Slice> slices = cs.getCollection(collection).getActiveSlices();
+ Slice slice = slices.iterator().next();
+ Replica partitionedReplica = slice.getReplica(replicaName);
+ replicaState = partitionedReplica.getState();
+ if (replicaState == state) return;
+ }
+ assertEquals("Timeout waiting for state "+ state +" of replica " + replicaName + ", current state " + replicaState,
+ state, replicaState);
+ }
+
protected void testRf3() throws Exception {
// create a collection that has 1 shard but 2 replicas
String testCollectionName = "c8n_1x3";
@@ -400,27 +482,30 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
+ " but found " + notLeaders.size() + "; clusterState: "
+ printClusterStateInfo(testCollectionName),
notLeaders.size() == 2);
+ JettySolrRunner leaderJetty = getJettyOnPort(getReplicaPort(getShardLeader(testCollectionName, "shard1", 1000)));
// ok, now introduce a network partition between the leader and the replica
SocketProxy proxy0 = getProxyForReplica(notLeaders.get(0));
+ SocketProxy leaderProxy = getProxyForReplica(getShardLeader(testCollectionName, "shard1", 1000));
proxy0.close();
+ leaderProxy.close();
// indexing during a partition
- sendDoc(2);
+ sendDoc(2, null, leaderJetty);
Thread.sleep(sleepMsBeforeHealPartition);
-
proxy0.reopen();
SocketProxy proxy1 = getProxyForReplica(notLeaders.get(1));
-
proxy1.close();
- sendDoc(3);
+ sendDoc(3, null, leaderJetty);
Thread.sleep(sleepMsBeforeHealPartition);
proxy1.reopen();
+
+ leaderProxy.reopen();
// sent 4 docs in so far, verify they are on the leader and replica
notLeaders = ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 3, maxWaitSecsToSeeAllActive);
@@ -578,8 +663,19 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
protected int sendDoc(int docId) throws Exception {
return sendDoc(docId, null);
}
-
+
+ // Send doc directly to a server (without going through proxy)
+ protected int sendDoc(int docId, Integer minRf, JettySolrRunner leaderJetty) throws IOException, SolrServerException {
+ try (HttpSolrClient solrClient = new HttpSolrClient.Builder(leaderJetty.getBaseUrl().toString()).build()) {
+ return sendDoc(docId, minRf, solrClient, cloudClient.getDefaultCollection());
+ }
+ }
+
protected int sendDoc(int docId, Integer minRf) throws Exception {
+ return sendDoc(docId, minRf, cloudClient, cloudClient.getDefaultCollection());
+ }
+
+ protected int sendDoc(int docId, Integer minRf, SolrClient solrClient, String collection) throws IOException, SolrServerException {
SolrInputDocument doc = new SolrInputDocument();
doc.addField(id, String.valueOf(docId));
doc.addField("a_t", "hello" + docId);
@@ -589,8 +685,7 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
up.setParam(UpdateRequest.MIN_REPFACT, String.valueOf(minRf));
}
up.add(doc);
-
- return cloudClient.getMinAchievedReplicationFactor(cloudClient.getDefaultCollection(), cloudClient.request(up));
+ return cloudClient.getMinAchievedReplicationFactor(collection, solrClient.request(up, collection));
}
/**
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/27ef6530/solr/core/src/test/org/apache/solr/cloud/LIRRollingUpdatesTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/LIRRollingUpdatesTest.java b/solr/core/src/test/org/apache/solr/cloud/LIRRollingUpdatesTest.java
new file mode 100644
index 0000000..a15406e
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/LIRRollingUpdatesTest.java
@@ -0,0 +1,457 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.Reader;
+import java.io.Writer;
+import java.lang.invoke.MethodHandles;
+import java.net.URI;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import org.apache.solr.JSONTestUtil;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkCoreNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.TimeSource;
+import org.apache.solr.util.TimeOut;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LIRRollingUpdatesTest extends SolrCloudTestCase {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private static Map<URI, SocketProxy> proxies;
+ private static Map<URI, JettySolrRunner> jettys;
+
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ configureCluster(3)
+ .addConfig("conf", configset("cloud-minimal"))
+ .configure();
+ // Add proxies
+ proxies = new HashMap<>(cluster.getJettySolrRunners().size());
+ jettys = new HashMap<>(cluster.getJettySolrRunners().size());
+ for (JettySolrRunner jetty:cluster.getJettySolrRunners()) {
+ SocketProxy proxy = new SocketProxy();
+ jetty.setProxyPort(proxy.getListenPort());
+ cluster.stopJettySolrRunner(jetty);//TODO: Can we avoid this restart
+ cluster.startJettySolrRunner(jetty);
+ proxy.open(jetty.getBaseUrl().toURI());
+ LOG.info("Adding proxy for URL: " + jetty.getBaseUrl() + ". Proxy: " + proxy.getUrl());
+ proxies.put(proxy.getUrl(), proxy);
+ jettys.put(proxy.getUrl(), jetty);
+ }
+ }
+
+
+ @AfterClass
+ public static void tearDownCluster() throws Exception {
+ for (SocketProxy proxy:proxies.values()) {
+ proxy.close();
+ }
+ proxies = null;
+ jettys = null;
+ }
+
+ @Test
+ public void testNewReplicaOldLeader() throws Exception {
+
+ String collection = "testNewReplicaOldLeader";
+ CollectionAdminRequest.createCollection(collection, 1, 2)
+ .setCreateNodeSet("")
+ .process(cluster.getSolrClient());
+ Properties oldLir = new Properties();
+ oldLir.setProperty("lirVersion", "old");
+
+ CollectionAdminRequest
+ .addReplicaToShard(collection, "shard1")
+ .setProperties(oldLir)
+ .setNode(cluster.getJettySolrRunner(0).getNodeName())
+ .process(cluster.getSolrClient());
+
+ CollectionAdminRequest
+ .addReplicaToShard(collection, "shard1")
+ .setProperties(oldLir)
+ .setNode(cluster.getJettySolrRunner(1).getNodeName())
+ .process(cluster.getSolrClient());
+ addDocs(collection, 2, 0);
+
+ Slice shard1 = getCollectionState(collection).getSlice("shard1");
+ //introduce network partition between leader & replica
+ Replica notLeader = shard1.getReplicas(x -> x != shard1.getLeader()).get(0);
+ assertTrue(runInOldLIRMode(collection, "shard1", notLeader));
+ getProxyForReplica(notLeader).close();
+ getProxyForReplica(shard1.getLeader()).close();
+
+ addDoc(collection, 2, getJettyForReplica(shard1.getLeader()));
+ waitForState("Replica " + notLeader.getName() + " is not put as DOWN", collection,
+ (liveNodes, collectionState) ->
+ collectionState.getSlice("shard1").getReplica(notLeader.getName()).getState() == Replica.State.DOWN);
+ getProxyForReplica(shard1.getLeader()).reopen();
+ getProxyForReplica(notLeader).reopen();
+ // make sure that, when new replica works with old leader, it still can recovery normally
+ waitForState("Timeout waiting for recovering", collection, clusterShape(1, 2));
+ assertDocsExistInAllReplicas(Collections.singletonList(notLeader), collection, 0, 2);
+
+ // make sure that, when new replica restart during LIR, it still can recovery normally (by looking at LIR node)
+ getProxyForReplica(notLeader).close();
+ getProxyForReplica(shard1.getLeader()).close();
+
+ addDoc(collection, 3, getJettyForReplica(shard1.getLeader()));
+ waitForState("Replica " + notLeader.getName() + " is not put as DOWN", collection,
+ (liveNodes, collectionState) ->
+ collectionState.getSlice("shard1").getReplica(notLeader.getName()).getState() == Replica.State.DOWN);
+
+ JettySolrRunner notLeaderJetty = getJettyForReplica(notLeader);
+ notLeaderJetty.stop();
+ waitForState("Node did not leave", collection, (liveNodes, collectionState) -> liveNodes.size() == 2);
+ upgrade(notLeaderJetty);
+ notLeaderJetty.start();
+
+ getProxyForReplica(shard1.getLeader()).reopen();
+ getProxyForReplica(notLeader).reopen();
+ waitForState("Timeout waiting for recovering", collection, clusterShape(1, 2));
+ assertFalse(runInOldLIRMode(collection, "shard1", notLeader));
+ assertDocsExistInAllReplicas(Collections.singletonList(notLeader), collection, 0, 3);
+
+ CollectionAdminRequest.deleteCollection(collection).process(cluster.getSolrClient());
+ }
+
+ public void testNewLeaderOldReplica() throws Exception {
+ // in case of new leader & old replica, new leader can still put old replica into LIR
+
+ String collection = "testNewLeaderOldReplica";
+ CollectionAdminRequest.createCollection(collection, 1, 2)
+ .setCreateNodeSet("")
+ .process(cluster.getSolrClient());
+ Properties oldLir = new Properties();
+ oldLir.setProperty("lirVersion", "old");
+
+ CollectionAdminRequest
+ .addReplicaToShard(collection, "shard1")
+ .setNode(cluster.getJettySolrRunner(0).getNodeName())
+ .process(cluster.getSolrClient());
+ waitForState("Timeout waiting for shard1 become active", collection, (liveNodes, collectionState) -> {
+ Slice shard1 = collectionState.getSlice("shard1");
+ if (shard1.getReplicas().size() == 1 && shard1.getLeader() != null) return true;
+ return false;
+ });
+
+ CollectionAdminRequest
+ .addReplicaToShard(collection, "shard1")
+ .setProperties(oldLir)
+ .setNode(cluster.getJettySolrRunner(1).getNodeName())
+ .process(cluster.getSolrClient());
+
+ Slice shard1 = getCollectionState(collection).getSlice("shard1");
+ Replica notLeader = shard1.getReplicas(x -> x != shard1.getLeader()).get(0);
+ Replica leader = shard1.getLeader();
+
+ assertTrue(runInOldLIRMode(collection, "shard1", notLeader));
+ assertFalse(runInOldLIRMode(collection, "shard1", leader));
+
+ addDocs(collection, 2, 0);
+ getProxyForReplica(notLeader).close();
+ getProxyForReplica(leader).close();
+
+ JettySolrRunner leaderJetty = getJettyForReplica(leader);
+ addDoc(collection, 2, leaderJetty);
+ waitForState("Replica " + notLeader.getName() + " is not put as DOWN", collection,
+ (liveNodes, collectionState) ->
+ collectionState.getSlice("shard1").getReplica(notLeader.getName()).getState() == Replica.State.DOWN);
+ // wait a little bit
+ Thread.sleep(500);
+ getProxyForReplica(notLeader).reopen();
+ getProxyForReplica(leader).reopen();
+
+ waitForState("Timeout waiting for recovering", collection, clusterShape(1, 2));
+ assertDocsExistInAllReplicas(Collections.singletonList(notLeader), collection, 0, 2);
+
+ // ensure that after recovery, the upgraded replica will clean its LIR status cause it is no longer needed
+ assertFalse(cluster.getSolrClient().getZkStateReader().getZkClient().exists(
+ ZkController.getLeaderInitiatedRecoveryZnodePath(collection, "shard1", notLeader.getName()), true));
+ // ensure that, leader should not register other replica's term
+ try (ZkShardTerms zkShardTerms = new ZkShardTerms(collection, "shard1", cluster.getZkClient())) {
+ assertFalse(zkShardTerms.getTerms().containsKey(notLeader.getName()));
+ }
+ CollectionAdminRequest.deleteCollection(collection).process(cluster.getSolrClient());
+ }
+
+ public void testLeaderAndMixedReplicas(boolean leaderInOldMode) throws Exception {
+ // in case of new leader and mixed old replica and new replica, new leader can still put all of them into recovery
+ // step1 : setup collection
+ String collection = "testMixedReplicas-"+leaderInOldMode;
+ CollectionAdminRequest.createCollection(collection, 1, 2)
+ .setCreateNodeSet("")
+ .process(cluster.getSolrClient());
+ Properties oldLir = new Properties();
+ oldLir.setProperty("lirVersion", "old");
+
+ if (leaderInOldMode) {
+ CollectionAdminRequest
+ .addReplicaToShard(collection, "shard1")
+ .setProperties(oldLir)
+ .setNode(cluster.getJettySolrRunner(0).getNodeName())
+ .process(cluster.getSolrClient());
+ } else {
+ CollectionAdminRequest
+ .addReplicaToShard(collection, "shard1")
+ .setNode(cluster.getJettySolrRunner(0).getNodeName())
+ .process(cluster.getSolrClient());
+ }
+
+ waitForState("Timeout waiting for shard1 become active", collection, clusterShape(1, 1));
+
+ CollectionAdminRequest
+ .addReplicaToShard(collection, "shard1")
+ .setProperties(oldLir)
+ .setNode(cluster.getJettySolrRunner(1).getNodeName())
+ .process(cluster.getSolrClient());
+
+ CollectionAdminRequest
+ .addReplicaToShard(collection, "shard1")
+ .setNode(cluster.getJettySolrRunner(2).getNodeName())
+ .process(cluster.getSolrClient());
+
+ Slice shard1 = getCollectionState(collection).getSlice("shard1");
+ Replica replicaInOldMode = shard1.getReplicas(x -> x != shard1.getLeader()).get(0);
+ Replica replicaInNewMode = shard1.getReplicas(x -> x != shard1.getLeader()).get(1);
+ Replica leader = shard1.getLeader();
+
+ assertEquals(leaderInOldMode, runInOldLIRMode(collection, "shard1", leader));
+ if (!runInOldLIRMode(collection, "shard1", replicaInOldMode)) {
+ Replica temp = replicaInOldMode;
+ replicaInOldMode = replicaInNewMode;
+ replicaInNewMode = temp;
+ }
+ assertTrue(runInOldLIRMode(collection, "shard1", replicaInOldMode));
+ assertFalse(runInOldLIRMode(collection, "shard1", replicaInNewMode));
+
+ addDocs(collection, 2, 0);
+
+ // step2 : introduce network partition then add doc, replicas should be put into recovery
+ getProxyForReplica(replicaInOldMode).close();
+ getProxyForReplica(replicaInNewMode).close();
+ getProxyForReplica(leader).close();
+
+ JettySolrRunner leaderJetty = getJettyForReplica(leader);
+ addDoc(collection, 2, leaderJetty);
+
+ Replica finalReplicaInOldMode = replicaInOldMode;
+ waitForState("Replica " + replicaInOldMode.getName() + " is not put as DOWN", collection,
+ (liveNodes, collectionState) ->
+ collectionState.getSlice("shard1").getReplica(finalReplicaInOldMode.getName()).getState() == Replica.State.DOWN);
+
+ // wait a little bit
+ Thread.sleep(500);
+ getProxyForReplica(replicaInOldMode).reopen();
+ getProxyForReplica(replicaInNewMode).reopen();
+ getProxyForReplica(leader).reopen();
+
+ waitForState("Timeout waiting for recovering", collection, clusterShape(1, 3));
+ assertDocsExistInAllReplicas(Arrays.asList(replicaInNewMode, replicaInOldMode), collection, 0, 2);
+
+ addDocs(collection, 3, 3);
+
+ // ensure that, leader should not register other replica's term
+ try (ZkShardTerms zkShardTerms = new ZkShardTerms(collection, "shard1", cluster.getZkClient())) {
+ assertFalse(zkShardTerms.getTerms().containsKey(replicaInOldMode.getName()));
+ }
+
+ // step3 : upgrade the replica running in old mode to the new mode
+ getProxyForReplica(leader).close();
+ getProxyForReplica(replicaInOldMode).close();
+ addDoc(collection, 6, leaderJetty);
+ JettySolrRunner oldJetty = getJettyForReplica(replicaInOldMode);
+ oldJetty.stop();
+ waitForState("Node did not leave", collection, (liveNodes, collectionState)
+ -> liveNodes.size() == 2);
+ upgrade(oldJetty);
+
+ oldJetty.start();
+ getProxyForReplica(leader).reopen();
+ getProxyForReplica(replicaInOldMode).reopen();
+
+ waitForState("Timeout waiting for recovering", collection, clusterShape(1, 3));
+ assertDocsExistInAllReplicas(Arrays.asList(replicaInNewMode, replicaInOldMode), collection, 0, 6);
+
+ CollectionAdminRequest.deleteCollection(collection).process(cluster.getSolrClient());
+ }
+
+ @Test
+ public void testNewLeaderAndMixedReplicas() throws Exception {
+ testLeaderAndMixedReplicas(false);
+ }
+
+ @Test
+ public void testOldLeaderAndMixedReplicas() throws Exception {
+ testLeaderAndMixedReplicas(true);
+ }
+
+ private void upgrade(JettySolrRunner solrRunner) {
+ File[] corePaths = new File(solrRunner.getSolrHome()).listFiles();
+ for (File corePath : corePaths) {
+ File coreProperties = new File(corePath, "core.properties");
+ if (!coreProperties.exists()) continue;
+ Properties properties = new Properties();
+
+ try (Reader reader = new InputStreamReader(new FileInputStream(coreProperties), "UTF-8")) {
+ properties.load(reader);
+ } catch (Exception e) {
+ continue;
+ }
+ properties.remove("lirVersion");
+ try (Writer writer = new OutputStreamWriter(new FileOutputStream(coreProperties), "UTF-8")) {
+ properties.store(writer, "Upgraded");
+ } catch (Exception e) {
+ continue;
+ }
+ }
+ }
+
+ protected void assertDocsExistInAllReplicas(List<Replica> notLeaders,
+ String testCollectionName, int firstDocId, int lastDocId)
+ throws Exception {
+ Replica leader =
+ cluster.getSolrClient().getZkStateReader().getLeaderRetry(testCollectionName, "shard1", 10000);
+ HttpSolrClient leaderSolr = getHttpSolrClient(leader, testCollectionName);
+ List<HttpSolrClient> replicas =
+ new ArrayList<HttpSolrClient>(notLeaders.size());
+
+ for (Replica r : notLeaders) {
+ replicas.add(getHttpSolrClient(r, testCollectionName));
+ }
+ try {
+ for (int d = firstDocId; d <= lastDocId; d++) {
+ String docId = String.valueOf(d);
+ assertDocExists(leaderSolr, testCollectionName, docId);
+ for (HttpSolrClient replicaSolr : replicas) {
+ assertDocExists(replicaSolr, testCollectionName, docId);
+ }
+ }
+ } finally {
+ if (leaderSolr != null) {
+ leaderSolr.close();
+ }
+ for (HttpSolrClient replicaSolr : replicas) {
+ replicaSolr.close();
+ }
+ }
+ }
+
+ protected void assertDocExists(HttpSolrClient solr, String coll, String docId) throws Exception {
+ NamedList rsp = realTimeGetDocId(solr, docId);
+ String match = JSONTestUtil.matchObj("/id", rsp.get("doc"), docId);
+ assertTrue("Doc with id=" + docId + " not found in " + solr.getBaseURL()
+ + " due to: " + match + "; rsp="+rsp, match == null);
+ }
+
+ private NamedList realTimeGetDocId(HttpSolrClient solr, String docId) throws SolrServerException, IOException {
+ QueryRequest qr = new QueryRequest(params("qt", "/get", "id", docId, "distrib", "false"));
+ return solr.request(qr);
+ }
+
+ protected HttpSolrClient getHttpSolrClient(Replica replica, String coll) throws Exception {
+ ZkCoreNodeProps zkProps = new ZkCoreNodeProps(replica);
+ String url = zkProps.getBaseUrl() + "/" + coll;
+ return getHttpSolrClient(url);
+ }
+
+ private <T> void waitFor(int waitTimeInSecs, T expected, Supplier<T> supplier) throws InterruptedException {
+ TimeOut timeOut = new TimeOut(waitTimeInSecs, TimeUnit.SECONDS, new TimeSource.CurrentTimeSource());
+ while (!timeOut.hasTimedOut()) {
+ if (expected == supplier.get()) return;
+ Thread.sleep(100);
+ }
+ assertEquals(expected, supplier.get());
+ }
+
+ private boolean runInOldLIRMode(String collection, String shard, Replica replica) {
+ try (ZkShardTerms shardTerms = new ZkShardTerms(collection, shard, cluster.getZkClient())) {
+ return !shardTerms.registered(replica.getName());
+ }
+ }
+
+ private void addDoc(String collection, int docId, JettySolrRunner solrRunner) throws IOException, SolrServerException {
+ try (HttpSolrClient solrClient = new HttpSolrClient.Builder(solrRunner.getBaseUrl().toString()).build()) {
+ solrClient.add(collection, new SolrInputDocument("id", String.valueOf(docId), "fieldName_s", String.valueOf(docId)));
+ }
+ }
+
+ private void addDocs(String collection, int numDocs, int startId) throws SolrServerException, IOException {
+ List<SolrInputDocument> docs = new ArrayList<>(numDocs);
+ for (int i = 0; i < numDocs; i++) {
+ int id = startId + i;
+ docs.add(new SolrInputDocument("id", String.valueOf(id), "fieldName_s", String.valueOf(id)));
+ }
+ cluster.getSolrClient().add(collection, docs);
+ cluster.getSolrClient().commit(collection);
+ }
+
+
+ protected JettySolrRunner getJettyForReplica(Replica replica) throws Exception {
+ String replicaBaseUrl = replica.getStr(ZkStateReader.BASE_URL_PROP);
+ assertNotNull(replicaBaseUrl);
+ URL baseUrl = new URL(replicaBaseUrl);
+
+ JettySolrRunner proxy = jettys.get(baseUrl.toURI());
+ assertNotNull("No proxy found for " + baseUrl + "!", proxy);
+ return proxy;
+ }
+
+ protected SocketProxy getProxyForReplica(Replica replica) throws Exception {
+ String replicaBaseUrl = replica.getStr(ZkStateReader.BASE_URL_PROP);
+ assertNotNull(replicaBaseUrl);
+ URL baseUrl = new URL(replicaBaseUrl);
+
+ SocketProxy proxy = proxies.get(baseUrl.toURI());
+ if (proxy == null && !baseUrl.toExternalForm().endsWith("/")) {
+ baseUrl = new URL(baseUrl.toExternalForm() + "/");
+ proxy = proxies.get(baseUrl.toURI());
+ }
+ assertNotNull("No proxy found for " + baseUrl + "!", proxy);
+ return proxy;
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/27ef6530/solr/core/src/test/org/apache/solr/cloud/LeaderInitiatedRecoveryOnCommitTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/LeaderInitiatedRecoveryOnCommitTest.java b/solr/core/src/test/org/apache/solr/cloud/LeaderInitiatedRecoveryOnCommitTest.java
deleted file mode 100644
index b4e4860..0000000
--- a/solr/core/src/test/org/apache/solr/cloud/LeaderInitiatedRecoveryOnCommitTest.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.cloud;
-
-import org.apache.http.NoHttpResponseException;
-import org.apache.solr.client.solrj.embedded.JettySolrRunner;
-import org.apache.solr.client.solrj.impl.HttpSolrClient;
-import org.apache.solr.common.SolrException;
-import org.apache.solr.common.cloud.Replica;
-import org.apache.solr.util.RTimer;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.lang.invoke.MethodHandles;
-import java.util.List;
-
-public class LeaderInitiatedRecoveryOnCommitTest extends BasicDistributedZkTest {
-
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
- private static final long sleepMsBeforeHealPartition = 2000L;
-
- private final boolean onlyLeaderIndexes = random().nextBoolean();
-
- public LeaderInitiatedRecoveryOnCommitTest() {
- super();
- sliceCount = 1;
- fixShardCount(4);
- }
-
- @Override
- protected boolean useTlogReplicas() {
- return onlyLeaderIndexes;
- }
-
- @Override
- @Test
- public void test() throws Exception {
- oneShardTest();
- multiShardTest();
- }
-
- private void multiShardTest() throws Exception {
-
- log.info("Running multiShardTest");
-
- // create a collection that has 2 shard and 2 replicas
- String testCollectionName = "c8n_2x2_commits";
- createCollection(testCollectionName, "conf1", 2, 2, 1);
- cloudClient.setDefaultCollection(testCollectionName);
-
- List<Replica> notLeaders =
- ensureAllReplicasAreActive(testCollectionName, "shard1", 2, 2, 30);
- assertTrue("Expected 1 replicas for collection " + testCollectionName
- + " but found " + notLeaders.size() + "; clusterState: "
- + printClusterStateInfo(),
- notLeaders.size() == 1);
-
- log.info("All replicas active for "+testCollectionName);
-
- // let's put the leader in its own partition, no replicas can contact it now
- Replica leader = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1");
- log.info("Creating partition to leader at "+leader.getCoreUrl());
- SocketProxy leaderProxy = getProxyForReplica(leader);
- leaderProxy.close();
-
- // let's find the leader of shard2 and ask him to commit
- Replica shard2Leader = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard2");
- sendCommitWithRetry(shard2Leader);
-
- Thread.sleep(sleepMsBeforeHealPartition);
-
- cloudClient.getZkStateReader().forceUpdateCollection(testCollectionName); // get the latest state
- leader = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1");
- assertSame("Leader was not active", Replica.State.ACTIVE, leader.getState());
-
- log.info("Healing partitioned replica at "+leader.getCoreUrl());
- leaderProxy.reopen();
- Thread.sleep(sleepMsBeforeHealPartition);
-
- // try to clean up
- attemptCollectionDelete(cloudClient, testCollectionName);
-
- log.info("multiShardTest completed OK");
- }
-
- private void oneShardTest() throws Exception {
- log.info("Running oneShardTest");
-
- // create a collection that has 1 shard and 3 replicas
- String testCollectionName = "c8n_1x3_commits";
- createCollection(testCollectionName, "conf1", 1, 3, 1);
- cloudClient.setDefaultCollection(testCollectionName);
-
- List<Replica> notLeaders =
- ensureAllReplicasAreActive(testCollectionName, "shard1", 1, 3, 30);
- assertTrue("Expected 2 replicas for collection " + testCollectionName
- + " but found " + notLeaders.size() + "; clusterState: "
- + printClusterStateInfo(),
- notLeaders.size() == 2);
-
- log.info("All replicas active for "+testCollectionName);
-
- // let's put the leader in its own partition, no replicas can contact it now
- Replica leader = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1");
- log.info("Creating partition to leader at "+leader.getCoreUrl());
- SocketProxy leaderProxy = getProxyForReplica(leader);
- leaderProxy.close();
-
- Replica replica = notLeaders.get(0);
- sendCommitWithRetry(replica);
- Thread.sleep(sleepMsBeforeHealPartition);
-
- cloudClient.getZkStateReader().forceUpdateCollection(testCollectionName); // get the latest state
- leader = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, "shard1");
- assertSame("Leader was not active", Replica.State.ACTIVE, leader.getState());
-
- log.info("Healing partitioned replica at "+leader.getCoreUrl());
- leaderProxy.reopen();
- Thread.sleep(sleepMsBeforeHealPartition);
-
- // try to clean up
- attemptCollectionDelete(cloudClient, testCollectionName);
-
- log.info("oneShardTest completed OK");
- }
-
- /**
- * Overrides the parent implementation to install a SocketProxy in-front of the Jetty server.
- */
- @Override
- public JettySolrRunner createJetty(File solrHome, String dataDir,
- String shardList, String solrConfigOverride, String schemaOverride, Replica.Type replicaType)
- throws Exception {
- return createProxiedJetty(solrHome, dataDir, shardList, solrConfigOverride, schemaOverride, replicaType);
- }
-
- protected void sendCommitWithRetry(Replica replica) throws Exception {
- String replicaCoreUrl = replica.getCoreUrl();
- log.info("Sending commit request to: "+replicaCoreUrl);
- final RTimer timer = new RTimer();
- try (HttpSolrClient client = getHttpSolrClient(replicaCoreUrl)) {
- try {
- client.commit();
-
- log.info("Sent commit request to {} OK, took {}ms", replicaCoreUrl, timer.getTime());
- } catch (Exception exc) {
- Throwable rootCause = SolrException.getRootCause(exc);
- if (rootCause instanceof NoHttpResponseException) {
- log.warn("No HTTP response from sending commit request to "+replicaCoreUrl+
- "; will re-try after waiting 3 seconds");
- Thread.sleep(3000);
- client.commit();
- log.info("Second attempt at sending commit to "+replicaCoreUrl+" succeeded.");
- } else {
- throw exc;
- }
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/27ef6530/solr/core/src/test/org/apache/solr/cloud/LeaderInitiatedRecoveryOnShardRestartTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/LeaderInitiatedRecoveryOnShardRestartTest.java b/solr/core/src/test/org/apache/solr/cloud/LeaderInitiatedRecoveryOnShardRestartTest.java
index 8064513..12bde17 100644
--- a/solr/core/src/test/org/apache/solr/cloud/LeaderInitiatedRecoveryOnShardRestartTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/LeaderInitiatedRecoveryOnShardRestartTest.java
@@ -18,6 +18,7 @@ package org.apache.solr.cloud;
import java.lang.invoke.MethodHandles;
import java.util.Map;
+import java.util.Properties;
import org.apache.lucene.util.LuceneTestCase.Nightly;
import org.apache.lucene.util.LuceneTestCase.Slow;
@@ -26,6 +27,7 @@ import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient.RemoteSolrException;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.SolrZkClient;
@@ -45,6 +47,7 @@ import org.slf4j.LoggerFactory;
@Slow
@Nightly
@AwaitsFix(bugUrl = "https://issues.apache.org/jira/browse/SOLR-10071")
+@Deprecated
public class LeaderInitiatedRecoveryOnShardRestartTest extends AbstractFullDistribZkTestBase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -86,7 +89,14 @@ public class LeaderInitiatedRecoveryOnShardRestartTest extends AbstractFullDistr
String testCollectionName = "all_in_lir";
String shardId = "shard1";
- createCollection(testCollectionName, "conf1", 1, 3, 1);
+ CollectionAdminRequest.createCollection(testCollectionName, "conf1", 1, 3)
+ .setCreateNodeSet("")
+ .process(cloudClient);
+ Properties oldLir = new Properties();
+ oldLir.setProperty("lirVersion", "old");
+ for (int i = 0; i < 3; i++) {
+ CollectionAdminRequest.addReplicaToShard(testCollectionName, "shard1").setProperties(oldLir).process(cloudClient);
+ }
waitForRecoveriesToFinish(testCollectionName, false);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/27ef6530/solr/core/src/test/org/apache/solr/cloud/TestLeaderInitiatedRecoveryThread.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestLeaderInitiatedRecoveryThread.java b/solr/core/src/test/org/apache/solr/cloud/TestLeaderInitiatedRecoveryThread.java
index 0fbc0a1..f099fc6 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestLeaderInitiatedRecoveryThread.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestLeaderInitiatedRecoveryThread.java
@@ -34,6 +34,7 @@ import org.apache.zookeeper.data.Stat;
/**
* Test for {@link LeaderInitiatedRecoveryThread}
*/
+@Deprecated
@SolrTestCaseJ4.SuppressSSL
public class TestLeaderInitiatedRecoveryThread extends AbstractFullDistribZkTestBase {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/27ef6530/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java b/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java
new file mode 100644
index 0000000..c205a50
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
+
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.common.util.TimeSource;
+import org.apache.solr.util.TimeOut;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ZkShardTermsTest extends SolrCloudTestCase {
+
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ configureCluster(1)
+ .addConfig("conf1", TEST_PATH().resolve("configsets").resolve("cloud-minimal").resolve("conf"))
+ .configure();
+ }
+
+ public void testParticipationOfReplicas() throws IOException, SolrServerException, InterruptedException {
+ String collection = "collection1";
+ try (ZkShardTerms zkShardTerms = new ZkShardTerms(collection, "shard2", cluster.getZkClient())) {
+ zkShardTerms.registerTerm("replica1");
+ zkShardTerms.registerTerm("replica2");
+ zkShardTerms.ensureTermsIsHigher("replica1", Collections.singleton("replica2"));
+ }
+
+ // When new collection is created, the old term nodes will be removed
+ CollectionAdminRequest.createCollection(collection, 2, 2)
+ .setCreateNodeSet(cluster.getJettySolrRunner(0).getNodeName())
+ .setMaxShardsPerNode(1000)
+ .process(cluster.getSolrClient());
+ ZkController zkController = cluster.getJettySolrRunners().get(0).getCoreContainer().getZkController();
+ waitFor(2, () -> zkController.getShardTerms(collection, "shard1").getTerms().size());
+ assertArrayEquals(new Long[]{0L, 0L}, zkController.getShardTerms(collection, "shard1").getTerms().values().toArray(new Long[2]));
+ waitFor(2, () -> zkController.getShardTerms(collection, "shard2").getTerms().size());
+ assertArrayEquals(new Long[]{0L, 0L}, zkController.getShardTerms(collection, "shard2").getTerms().values().toArray(new Long[2]));
+ }
+
+ public void testRegisterTerm() throws InterruptedException {
+ String collection = "registerTerm";
+ ZkShardTerms rep1Terms = new ZkShardTerms(collection, "shard1", cluster.getZkClient());
+ ZkShardTerms rep2Terms = new ZkShardTerms(collection, "shard1", cluster.getZkClient());
+
+ rep1Terms.registerTerm("rep1");
+ rep2Terms.registerTerm("rep2");
+ try (ZkShardTerms zkShardTerms = new ZkShardTerms(collection, "shard1", cluster.getZkClient())) {
+ assertEquals(0L, zkShardTerms.getTerm("rep1"));
+ assertEquals(0L, zkShardTerms.getTerm("rep2"));
+ }
+ waitFor(2, () -> rep1Terms.getTerms().size());
+ rep1Terms.ensureTermsIsHigher("rep1", Collections.singleton("rep2"));
+ assertEquals(1L, rep1Terms.getTerm("rep1"));
+ assertEquals(0L, rep1Terms.getTerm("rep2"));
+
+ // assert registerTerm does not override current value
+ rep1Terms.registerTerm("rep1");
+ assertEquals(1L, rep1Terms.getTerm("rep1"));
+
+ waitFor(1L, () -> rep2Terms.getTerm("rep1"));
+ rep2Terms.setEqualsToMax("rep2");
+ assertEquals(1L, rep2Terms.getTerm("rep2"));
+ rep2Terms.registerTerm("rep2");
+ assertEquals(1L, rep2Terms.getTerm("rep2"));
+
+ // zkShardTerms must stay updated by watcher
+ Map<String, Long> expectedTerms = new HashMap<>();
+ expectedTerms.put("rep1", 1L);
+ expectedTerms.put("rep2", 1L);
+
+ TimeOut timeOut = new TimeOut(10, TimeUnit.SECONDS, new TimeSource.CurrentTimeSource());
+ while (!timeOut.hasTimedOut()) {
+ if (Objects.equals(expectedTerms, rep1Terms.getTerms()) && Objects.equals(expectedTerms, rep2Terms.getTerms())) break;
+ }
+ if (timeOut.hasTimedOut()) fail("Expected zkShardTerms must stay updated");
+
+ rep1Terms.close();
+ rep2Terms.close();
+ }
+
+ @Test
+ public void testRaceConditionOnUpdates() throws InterruptedException {
+ String collection = "raceConditionOnUpdates";
+ List<String> replicas = Arrays.asList("rep1", "rep2", "rep3", "rep4");
+ for (String replica : replicas) {
+ try (ZkShardTerms zkShardTerms = new ZkShardTerms(collection, "shard1", cluster.getZkClient())) {
+ zkShardTerms.registerTerm(replica);
+ }
+ }
+
+ List<String> failedReplicas = new ArrayList<>(replicas);
+ Collections.shuffle(failedReplicas, random());
+ while (failedReplicas.size() > 2) {
+ failedReplicas.remove(0);
+ }
+ AtomicBoolean stop = new AtomicBoolean(false);
+ Thread[] threads = new Thread[failedReplicas.size()];
+ for (int i = 0; i < failedReplicas.size(); i++) {
+ String replica = failedReplicas.get(i);
+ threads[i] = new Thread(() -> {
+ try (ZkShardTerms zkShardTerms = new ZkShardTerms(collection, "shard1", cluster.getZkClient())) {
+ while (!stop.get()) {
+ try {
+ Thread.sleep(random().nextInt(200));
+ zkShardTerms.setEqualsToMax(replica);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ });
+ threads[i].start();
+ }
+
+ long maxTerm = 0;
+ try (ZkShardTerms shardTerms = new ZkShardTerms(collection, "shard1", cluster.getZkClient())) {
+ shardTerms.registerTerm("leader");
+ TimeOut timeOut = new TimeOut(10, TimeUnit.SECONDS, new TimeSource.CurrentTimeSource());
+ while (!timeOut.hasTimedOut()) {
+ maxTerm++;
+ assertEquals(shardTerms.getTerms().get("leader"), Collections.max(shardTerms.getTerms().values()));
+ Thread.sleep(100);
+ }
+ assertTrue(maxTerm >= Collections.max(shardTerms.getTerms().values()));
+ }
+ stop.set(true);
+ for (Thread thread : threads) {
+ thread.join();
+ }
+ }
+
+ public void testCoreTermWatcher() throws InterruptedException {
+ String collection = "coreTermWatcher";
+ ZkShardTerms leaderTerms = new ZkShardTerms(collection, "shard1", cluster.getZkClient());
+ leaderTerms.registerTerm("leader");
+ ZkShardTerms replicaTerms = new ZkShardTerms(collection, "shard1", cluster.getZkClient());
+ AtomicInteger count = new AtomicInteger(0);
+ // this will get called for almost 3 times
+ ZkShardTerms.CoreTermWatcher watcher = terms -> count.incrementAndGet() < 3;
+ replicaTerms.addListener(watcher);
+ replicaTerms.registerTerm("replica");
+ waitFor(1, count::get);
+ leaderTerms.ensureTermsIsHigher("leader", Collections.singleton("replica"));
+ waitFor(2, count::get);
+ replicaTerms.setEqualsToMax("replica");
+ waitFor(3, count::get);
+ assertEquals(0, replicaTerms.getNumListeners());
+
+ leaderTerms.close();
+ replicaTerms.close();
+ }
+
+ public void testEnsureTermsIsHigher() {
+ Map<String, Long> map = new HashMap<>();
+ map.put("leader", 0L);
+ ZkShardTerms.Terms terms = new ZkShardTerms.Terms(map, 0);
+ terms = terms.increaseTerms("leader", Collections.singleton("replica"));
+ assertEquals(1L, terms.getTerm("leader").longValue());
+ }
+
+ private <T> void waitFor(T expected, Supplier<T> supplier) throws InterruptedException {
+ TimeOut timeOut = new TimeOut(10, TimeUnit.SECONDS, new TimeSource.CurrentTimeSource());
+ while (!timeOut.hasTimedOut()) {
+ if (expected == supplier.get()) return;
+ Thread.sleep(100);
+ }
+ assertEquals(expected, supplier.get());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/27ef6530/solr/core/src/test/org/apache/solr/update/TestInPlaceUpdatesDistrib.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/update/TestInPlaceUpdatesDistrib.java b/solr/core/src/test/org/apache/solr/update/TestInPlaceUpdatesDistrib.java
index 9a24c2a..14f0a7c 100644
--- a/solr/core/src/test/org/apache/solr/update/TestInPlaceUpdatesDistrib.java
+++ b/solr/core/src/test/org/apache/solr/update/TestInPlaceUpdatesDistrib.java
@@ -42,7 +42,7 @@ import org.apache.solr.client.solrj.request.schema.SchemaRequest.Field;
import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.client.solrj.response.schema.SchemaResponse.FieldResponse;
import org.apache.solr.cloud.AbstractFullDistribZkTestBase;
-import org.apache.solr.cloud.ZkController;
+import org.apache.solr.cloud.ZkShardTerms;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrInputDocument;
@@ -908,23 +908,20 @@ public class TestInPlaceUpdatesDistrib extends AbstractFullDistribZkTestBase {
commit();
- // TODO: Could try checking ZK for LIR flags to ensure LIR has not kicked in
- // Check every 10ms, 100 times, for a replica to go down (& assert that it doesn't)
- ZkController zkController = shardToLeaderJetty.get(SHARD1).jetty.getCoreContainer().getZkController();
- String lirPath = zkController.getLeaderInitiatedRecoveryZnodePath(DEFAULT_TEST_COLLECTION_NAME, SHARD1);
- assertFalse (zkController.getZkClient().exists(lirPath, true));
-
- for (int i=0; i<100; i++) {
- Thread.sleep(10);
- cloudClient.getZkStateReader().forceUpdateCollection(DEFAULT_COLLECTION);
- ClusterState state = cloudClient.getZkStateReader().getClusterState();
-
- int numActiveReplicas = 0;
- for (Replica rep: state.getCollection(DEFAULT_COLLECTION).getSlice(SHARD1).getReplicas())
- if (rep.getState().equals(Replica.State.ACTIVE))
- numActiveReplicas++;
-
- assertEquals("The replica receiving reordered updates must not have gone down", 3, numActiveReplicas);
+ try (ZkShardTerms zkShardTerms = new ZkShardTerms(DEFAULT_COLLECTION, SHARD1, cloudClient.getZkStateReader().getZkClient())) {
+ for (int i=0; i<100; i++) {
+ Thread.sleep(10);
+ cloudClient.getZkStateReader().forceUpdateCollection(DEFAULT_COLLECTION);
+ ClusterState state = cloudClient.getZkStateReader().getClusterState();
+
+ int numActiveReplicas = 0;
+ for (Replica rep: state.getCollection(DEFAULT_COLLECTION).getSlice(SHARD1).getReplicas()) {
+ assertTrue(zkShardTerms.canBecomeLeader(rep.getName()));
+ if (rep.getState().equals(Replica.State.ACTIVE))
+ numActiveReplicas++;
+ }
+ assertEquals("The replica receiving reordered updates must not have gone down", 3, numActiveReplicas);
+ }
}
for (SolrClient client: new SolrClient[] {LEADER, NONLEADERS.get(0),
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/27ef6530/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
----------------------------------------------------------------------
diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
index b031393..be2b7db 100644
--- a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
+++ b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java
@@ -1963,6 +1963,7 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
long waitMs = 0L;
long maxWaitMs = maxWaitSecs * 1000L;
Replica leader = null;
+ ZkShardTerms zkShardTerms = new ZkShardTerms(testCollectionName, shardId, cloudClient.getZkStateReader().getZkClient());
while (waitMs < maxWaitMs && !allReplicasUp) {
cs = cloudClient.getZkStateReader().getClusterState();
assertNotNull(cs);
@@ -1981,7 +1982,8 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
// ensure all replicas are "active" and identify the non-leader replica
for (Replica replica : replicas) {
- if (replica.getState() != Replica.State.ACTIVE) {
+ if (!zkShardTerms.canBecomeLeader(replica.getName()) ||
+ replica.getState() != Replica.State.ACTIVE) {
log.info("Replica {} is currently {}", replica.getName(), replica.getState());
allReplicasUp = false;
}
@@ -1998,6 +2000,7 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
}
} // end while
+ zkShardTerms.close();
if (!allReplicasUp)
fail("Didn't see all replicas for shard "+shardId+" in "+testCollectionName+
" come up within " + maxWaitMs + " ms! ClusterState: " + printClusterStateInfo());
[2/2] lucene-solr:master: SOLR-11702: Redesign current LIR
implementation
Posted by da...@apache.org.
SOLR-11702: Redesign current LIR implementation
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/27ef6530
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/27ef6530
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/27ef6530
Branch: refs/heads/master
Commit: 27ef6530646a9af6f8fdf491afd80185bc4f7fee
Parents: 00d453d
Author: Cao Manh Dat <da...@apache.org>
Authored: Mon Jan 29 15:55:28 2018 +0700
Committer: Cao Manh Dat <da...@apache.org>
Committed: Mon Jan 29 15:55:28 2018 +0700
----------------------------------------------------------------------
solr/CHANGES.txt | 6 +
.../client/solrj/embedded/JettySolrRunner.java | 16 +-
.../org/apache/solr/cloud/ElectionContext.java | 48 +-
.../cloud/LeaderInitiatedRecoveryThread.java | 1 +
.../solr/cloud/RecoveringCoreTermWatcher.java | 75 +++
.../org/apache/solr/cloud/RecoveryStrategy.java | 75 ++-
.../apache/solr/cloud/ZkCollectionTerms.java | 65 +++
.../org/apache/solr/cloud/ZkController.java | 112 +++--
.../org/apache/solr/cloud/ZkShardTerms.java | 475 +++++++++++++++++++
.../api/collections/CreateCollectionCmd.java | 18 +-
.../solr/handler/admin/CollectionsHandler.java | 45 +-
.../solr/handler/admin/PrepRecoveryOp.java | 7 +
.../solr/update/DefaultSolrCoreState.java | 19 +-
.../processor/DistributedUpdateProcessor.java | 86 +++-
.../org/apache/solr/cloud/ForceLeaderTest.java | 220 +++++++--
.../solr/cloud/HttpPartitionOnCommitTest.java | 178 +++++++
.../apache/solr/cloud/HttpPartitionTest.java | 179 +++++--
.../solr/cloud/LIRRollingUpdatesTest.java | 457 ++++++++++++++++++
.../LeaderInitiatedRecoveryOnCommitTest.java | 178 -------
...aderInitiatedRecoveryOnShardRestartTest.java | 12 +-
.../TestLeaderInitiatedRecoveryThread.java | 1 +
.../org/apache/solr/cloud/ZkShardTermsTest.java | 204 ++++++++
.../solr/update/TestInPlaceUpdatesDistrib.java | 33 +-
.../cloud/AbstractFullDistribZkTestBase.java | 5 +-
24 files changed, 2112 insertions(+), 403 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/27ef6530/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 2943f6c..57d40b3 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -71,6 +71,10 @@ Upgrade Notes
Before 7.3, the copied over configset was named the same as the collection name, but 7.3 onwards it will be named
with an additional ".AUTOCREATED" suffix.
+* SOLR-11702: The old LIR implementation (SOLR-5495) is now deprecated and replaced.
+ Solr will support rolling upgrades from old 7.x versions of Solr to the new one until
+ the last release of the 7.x major version.
+
New Features
----------------------
* SOLR-11285: Simulation framework for autoscaling. (ab)
@@ -113,6 +117,8 @@ New Features
* SOLR-11617: Alias metadata is now mutable via a new MODIFYALIAS command. Metadata is returned from LISTALIASES.
(Gus Heck via David Smiley)
+* SOLR-11702: Redesign current LIR implementation (Cao Manh Dat, shalin)
+
Bug Fixes
----------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/27ef6530/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java b/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
index e5b81f8..23a8dc1 100644
--- a/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
+++ b/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
@@ -84,7 +84,7 @@ public class JettySolrRunner {
FilterHolder debugFilter;
private boolean waitOnSolr = false;
- private int lastPort = -1;
+ private int jettyPort = -1;
private final JettyConfig config;
private final String solrHome;
@@ -280,8 +280,10 @@ public class JettySolrRunner {
@Override
public void lifeCycleStarted(LifeCycle arg0) {
- lastPort = getFirstConnectorPort();
- nodeProperties.setProperty("hostPort", Integer.toString(lastPort));
+ jettyPort = getFirstConnectorPort();
+ int port = jettyPort;
+ if (proxyPort != -1) port = proxyPort;
+ nodeProperties.setProperty("hostPort", Integer.toString(port));
nodeProperties.setProperty("hostContext", config.context);
root.getServletContext().setAttribute(SolrDispatchFilter.PROPERTIES_ATTRIBUTE, nodeProperties);
@@ -384,7 +386,7 @@ public class JettySolrRunner {
// if started before, make a new server
if (startedBefore) {
waitOnSolr = false;
- int port = reusePort ? lastPort : this.config.port;
+ int port = reusePort ? jettyPort : this.config.port;
init(port);
} else {
startedBefore = true;
@@ -456,7 +458,7 @@ public class JettySolrRunner {
if (0 == conns.length) {
throw new RuntimeException("Jetty Server has no Connectors");
}
- return (proxyPort != -1) ? proxyPort : ((ServerConnector) conns[0]).getLocalPort();
+ return ((ServerConnector) conns[0]).getLocalPort();
}
/**
@@ -465,10 +467,10 @@ public class JettySolrRunner {
* @exception RuntimeException if there is no Connector
*/
public int getLocalPort() {
- if (lastPort == -1) {
+ if (jettyPort == -1) {
throw new IllegalStateException("You cannot get the port until this instance has started");
}
- return (proxyPort != -1) ? proxyPort : lastPort;
+ return (proxyPort != -1) ? proxyPort : jettyPort;
}
/**
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/27ef6530/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
index 7169ea8..2d00151 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java
@@ -20,6 +20,7 @@ import java.io.Closeable;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.Future;
@@ -491,7 +492,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
rejoinLeaderElection(core);
}
}
-
+
if (isLeader) {
// check for any replicas in my shard that were set to down by the previous leader
try {
@@ -530,6 +531,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
return docCollection.getReplica(replicaName);
}
+ @Deprecated
public void checkLIR(String coreName, boolean allReplicasInLine)
throws InterruptedException, KeeperException, IOException {
if (allReplicasInLine) {
@@ -551,7 +553,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
leaderProps.getStr(ZkStateReader.CORE_NODE_NAME_PROP), Replica.State.ACTIVE, core.getCoreDescriptor(), true);
}
}
-
+
} else {
try (SolrCore core = cc.getCore(coreName)) {
if (core != null) {
@@ -567,7 +569,8 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
}
}
}
-
+
+ @Deprecated
private void startLeaderInitiatedRecoveryOnReplicas(String coreName) throws Exception {
try (SolrCore core = cc.getCore(coreName)) {
CloudDescriptor cloudDesc = core.getCoreDescriptor().getCloudDescriptor();
@@ -577,10 +580,10 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
if (coll == null || shardId == null) {
log.error("Cannot start leader-initiated recovery on new leader (core="+
- coreName+",coreNodeName=" + coreNodeName + ") because collection and/or shard is null!");
+ coreName+",coreNodeName=" + coreNodeName + ") because collection and/or shard is null!");
return;
}
-
+
String znodePath = zkController.getLeaderInitiatedRecoveryZnodePath(coll, shardId);
List<String> replicas = null;
try {
@@ -588,21 +591,28 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
} catch (NoNodeException nne) {
// this can be ignored
}
-
+
if (replicas != null && replicas.size() > 0) {
for (String replicaCoreNodeName : replicas) {
-
+
if (coreNodeName.equals(replicaCoreNodeName))
continue; // added safe-guard so we don't mark this core as down
-
+
+ if (zkController.getShardTerms(collection, shardId).registered(replicaCoreNodeName)) {
+ // the replica registered its term so it is running with the new LIR implementation
+ // we can put this replica into recovery by increase our terms
+ zkController.getShardTerms(collection, shardId).ensureTermsIsHigher(coreNodeName, Collections.singleton(replicaCoreNodeName));
+ continue;
+ }
+
final Replica.State lirState = zkController.getLeaderInitiatedRecoveryState(coll, shardId, replicaCoreNodeName);
if (lirState == Replica.State.DOWN || lirState == Replica.State.RECOVERY_FAILED) {
log.info("After core={} coreNodeName={} was elected leader, a replica coreNodeName={} was found in state: "
+ lirState.toString() + " and needing recovery.", coreName, coreNodeName, replicaCoreNodeName);
- List<ZkCoreNodeProps> replicaProps =
+ List<ZkCoreNodeProps> replicaProps =
zkController.getZkStateReader().getReplicaProps(collection, shardId, coreNodeName);
-
- if (replicaProps != null && replicaProps.size() > 0) {
+
+ if (replicaProps != null && replicaProps.size() > 0) {
ZkCoreNodeProps coreNodeProps = null;
for (ZkCoreNodeProps p : replicaProps) {
if (((Replica)p.getNodeProps()).getName().equals(replicaCoreNodeName)) {
@@ -610,17 +620,18 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
break;
}
}
-
+
zkController.ensureReplicaInLeaderInitiatedRecovery(cc,
collection, shardId, coreNodeProps, core.getCoreDescriptor(),
false /* forcePublishState */);
- }
+ }
}
}
}
- } // core gets closed automagically
+ } // core gets closed automagically
}
+
// returns true if all replicas are found to be up, false if not
private boolean waitForReplicasToComeUp(int timeoutms) throws InterruptedException {
long timeoutAt = System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeoutms, TimeUnit.MILLISECONDS);
@@ -743,7 +754,14 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
// to make sure others participate in sync and leader election, we can be leader
return true;
}
-
+
+ String coreNodeName = core.getCoreDescriptor().getCloudDescriptor().getCoreNodeName();
+ if (zkController.getShardTerms(collection, shardId).registered(coreNodeName)
+ && !zkController.getShardTerms(collection, shardId).canBecomeLeader(coreNodeName)) {
+ log.info("Can't become leader, term of replica {} less than leader", coreNodeName);
+ return false;
+ }
+
if (core.getCoreDescriptor().getCloudDescriptor().getLastPublished() == Replica.State.ACTIVE) {
log.debug("My last published State was Active, it's okay to be the leader.");
return true;
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/27ef6530/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java b/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java
index 8c892ce..9c46236 100644
--- a/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java
+++ b/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java
@@ -45,6 +45,7 @@ import java.util.List;
* replica; used by a shard leader to nag a replica into recovering after the
* leader experiences an error trying to send an update request to the replica.
*/
+@Deprecated
public class LeaderInitiatedRecoveryThread extends Thread {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/27ef6530/solr/core/src/java/org/apache/solr/cloud/RecoveringCoreTermWatcher.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveringCoreTermWatcher.java b/solr/core/src/java/org/apache/solr/cloud/RecoveringCoreTermWatcher.java
new file mode 100644
index 0000000..26fec97
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/RecoveringCoreTermWatcher.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud;
+
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.solr.core.SolrCore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Start recovery of a core if its term is less than leader's term
+ */
+public class RecoveringCoreTermWatcher implements ZkShardTerms.CoreTermWatcher {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private final SolrCore solrCore;
+ // used to prevent the case when term of other replicas get changed, we redo recovery
+ // the idea here is with a specific term of a replica, we only do recovery one
+ private final AtomicLong lastTermDoRecovery;
+
+ RecoveringCoreTermWatcher(SolrCore solrCore) {
+ this.solrCore = solrCore;
+ this.lastTermDoRecovery = new AtomicLong(-1);
+ }
+
+ @Override
+ public boolean onTermChanged(ZkShardTerms.Terms terms) {
+ if (solrCore.isClosed()) {
+ return false;
+ }
+
+ if (solrCore.getCoreDescriptor() == null || solrCore.getCoreDescriptor().getCloudDescriptor() == null) return true;
+
+ String coreNodeName = solrCore.getCoreDescriptor().getCloudDescriptor().getCoreNodeName();
+ if (terms.canBecomeLeader(coreNodeName)) return true;
+ if (lastTermDoRecovery.get() < terms.getTerm(coreNodeName)) {
+ log.info("Start recovery on {} because core's term is less than leader's term", coreNodeName);
+ lastTermDoRecovery.set(terms.getTerm(coreNodeName));
+ solrCore.getUpdateHandler().getSolrCoreState().doRecovery(solrCore.getCoreContainer(), solrCore.getCoreDescriptor());
+ }
+
+ return true;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ RecoveringCoreTermWatcher that = (RecoveringCoreTermWatcher) o;
+
+ return solrCore.equals(that.solrCore);
+ }
+
+ @Override
+ public int hashCode() {
+ return solrCore.hashCode();
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/27ef6530/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
index 3ab4eca..63dfe19 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
@@ -35,8 +35,10 @@ import org.apache.solr.client.solrj.impl.HttpSolrClient.HttpUriRequestResponse;
import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
import org.apache.solr.client.solrj.request.CoreAdminRequest.WaitForState;
import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.SolrPingResponse;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
@@ -458,7 +460,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
core.getCoreDescriptor());
return;
}
-
+
// we temporary ignore peersync for tlog replicas
boolean firstTime = replicaType != Replica.Type.TLOG;
@@ -516,21 +518,18 @@ public class RecoveryStrategy implements Runnable, Closeable {
zkController.stopReplicationFromLeader(coreName);
}
+ final String ourUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, coreName);
Future<RecoveryInfo> replayFuture = null;
while (!successfulRecovery && !Thread.currentThread().isInterrupted() && !isClosed()) { // don't use interruption or it will close channels though
try {
CloudDescriptor cloudDesc = core.getCoreDescriptor().getCloudDescriptor();
- ZkNodeProps leaderprops = zkStateReader.getLeaderRetry(
- cloudDesc.getCollectionName(), cloudDesc.getShardId());
-
- final String leaderBaseUrl = leaderprops.getStr(ZkStateReader.BASE_URL_PROP);
- final String leaderCoreName = leaderprops.getStr(ZkStateReader.CORE_NAME_PROP);
-
- String leaderUrl = ZkCoreNodeProps.getCoreUrl(leaderBaseUrl, leaderCoreName);
-
- String ourUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, coreName);
+ final Replica leader = pingLeader(ourUrl, core.getCoreDescriptor(), true);
+ if (isClosed()) {
+ LOG.info("RecoveryStrategy has been closed");
+ break;
+ }
- boolean isLeader = leaderUrl.equals(ourUrl);
+ boolean isLeader = leader.getCoreUrl().equals(ourUrl);
if (isLeader && !cloudDesc.isLeader()) {
throw new SolrException(ErrorCode.SERVER_ERROR, "Cloud state still says we are leader.");
}
@@ -541,12 +540,12 @@ public class RecoveryStrategy implements Runnable, Closeable {
zkController.publish(core.getCoreDescriptor(), Replica.State.ACTIVE);
return;
}
-
+
LOG.info("Begin buffering updates. core=[{}]", coreName);
ulog.bufferUpdates();
replayed = false;
- LOG.info("Publishing state of core [{}] as recovering, leader is [{}] and I am [{}]", core.getName(), leaderUrl,
+ LOG.info("Publishing state of core [{}] as recovering, leader is [{}] and I am [{}]", core.getName(), leader.getCoreUrl(),
ourUrl);
zkController.publish(core.getCoreDescriptor(), Replica.State.RECOVERING);
@@ -565,7 +564,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
break;
}
- sendPrepRecoveryCmd(leaderBaseUrl, leaderCoreName, slice);
+ sendPrepRecoveryCmd(leader.getBaseUrl(), leader.getCoreName(), slice);
if (isClosed()) {
LOG.info("RecoveryStrategy has been closed");
@@ -585,11 +584,11 @@ public class RecoveryStrategy implements Runnable, Closeable {
// first thing we just try to sync
if (firstTime) {
firstTime = false; // only try sync the first time through the loop
- LOG.info("Attempting to PeerSync from [{}] - recoveringAfterStartup=[{}]", leaderUrl, recoveringAfterStartup);
+ LOG.info("Attempting to PeerSync from [{}] - recoveringAfterStartup=[{}]", leader.getCoreUrl(), recoveringAfterStartup);
// System.out.println("Attempting to PeerSync from " + leaderUrl
// + " i am:" + zkController.getNodeName());
PeerSync peerSync = new PeerSync(core,
- Collections.singletonList(leaderUrl), ulog.getNumRecordsToKeep(), false, false);
+ Collections.singletonList(leader.getCoreUrl()), ulog.getNumRecordsToKeep(), false, false);
peerSync.setStartingVersions(recentVersions);
boolean syncSuccess = peerSync.sync().isSuccess();
if (syncSuccess) {
@@ -623,7 +622,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
try {
- replicate(zkController.getNodeName(), core, leaderprops);
+ replicate(zkController.getNodeName(), core, leader);
if (isClosed()) {
LOG.info("RecoveryStrategy has been closed");
@@ -745,6 +744,48 @@ public class RecoveryStrategy implements Runnable, Closeable {
LOG.info("Finished recovery process, successful=[{}]", Boolean.toString(successfulRecovery));
}
+ private final Replica pingLeader(String ourUrl, CoreDescriptor coreDesc, boolean mayPutReplicaAsDown) throws Exception {
+ int numTried = 0;
+ while (true) {
+ CloudDescriptor cloudDesc = coreDesc.getCloudDescriptor();
+ DocCollection docCollection = zkStateReader.getClusterState().getCollection(cloudDesc.getCollectionName());
+ if (mayPutReplicaAsDown && numTried == 1 &&
+ docCollection.getReplica(coreDesc.getCloudDescriptor().getCoreNodeName()).getState() == Replica.State.ACTIVE) {
+ // this operation may take a long time, by putting replica into DOWN state, client won't query this replica
+ zkController.publish(coreDesc, Replica.State.DOWN);
+ }
+ numTried++;
+ final Replica leaderReplica = zkStateReader.getLeaderRetry(
+ cloudDesc.getCollectionName(), cloudDesc.getShardId());
+
+ if (isClosed()) {
+ return leaderReplica;
+ }
+
+ if (leaderReplica.getCoreUrl().equals(ourUrl)) {
+ return leaderReplica;
+ }
+
+ try (HttpSolrClient httpSolrClient = new HttpSolrClient.Builder(leaderReplica.getCoreUrl())
+ .withSocketTimeout(1000)
+ .withConnectionTimeout(1000)
+ .build()) {
+ SolrPingResponse resp = httpSolrClient.ping();
+ return leaderReplica;
+ } catch (IOException e) {
+ LOG.info("Failed to connect leader {} on recovery, try again", leaderReplica.getBaseUrl(), e);
+ Thread.sleep(500);
+ } catch (Exception e) {
+ if (e.getCause() instanceof IOException) {
+ LOG.info("Failed to connect leader {} on recovery, try again", leaderReplica.getBaseUrl(), e);
+ Thread.sleep(500);
+ } else {
+ return leaderReplica;
+ }
+ }
+ }
+ }
+
public static Runnable testing_beforeReplayBufferingUpdates;
final private Future<RecoveryInfo> replay(SolrCore core)
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/27ef6530/solr/core/src/java/org/apache/solr/cloud/ZkCollectionTerms.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkCollectionTerms.java b/solr/core/src/java/org/apache/solr/cloud/ZkCollectionTerms.java
new file mode 100644
index 0000000..b232f9b
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkCollectionTerms.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.util.ObjectReleaseTracker;
+import org.apache.solr.core.CoreDescriptor;
+
+/**
+ * Used to manage all ZkShardTerms of a collection
+ */
+class ZkCollectionTerms implements AutoCloseable {
+ private final String collection;
+ private final Map<String, ZkShardTerms> terms;
+ private final SolrZkClient zkClient;
+
+ ZkCollectionTerms(String collection, SolrZkClient client) {
+ this.collection = collection;
+ this.terms = new HashMap<>();
+ this.zkClient = client;
+ ObjectReleaseTracker.track(this);
+ }
+
+
+ public ZkShardTerms getShard(String shardId) {
+ synchronized (terms) {
+ if (!terms.containsKey(shardId)) terms.put(shardId, new ZkShardTerms(collection, shardId, zkClient));
+ return terms.get(shardId);
+ }
+ }
+
+ public void remove(String shardId, CoreDescriptor coreDescriptor) {
+ synchronized (terms) {
+ if (getShard(shardId).removeTerm(coreDescriptor)) {
+ terms.remove(shardId).close();
+ }
+ }
+ }
+
+ public void close() {
+ synchronized (terms) {
+ terms.values().forEach(ZkShardTerms::close);
+ }
+ ObjectReleaseTracker.release(this);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/27ef6530/solr/core/src/java/org/apache/solr/cloud/ZkController.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index 365da65..7898e96 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -193,7 +193,6 @@ public class ZkController {
private final Map<ContextKey, ElectionContext> electionContexts = Collections.synchronizedMap(new HashMap<>());
private final SolrZkClient zkClient;
- private final ZkCmdExecutor cmdExecutor;
public final ZkStateReader zkStateReader;
private SolrCloudManager cloudManager;
private CloudSolrClient cloudSolrClient;
@@ -210,6 +209,7 @@ public class ZkController {
private LeaderElector overseerElector;
private Map<String, ReplicateFromLeader> replicateFromLeaders = new ConcurrentHashMap<>();
+ private final Map<String, ZkCollectionTerms> collectionToTerms = new HashMap<>();
// for now, this can be null in tests, in which case recovery will be inactive, and other features
// may accept defaults or use mocks rather than pulling things from a CoreContainer
@@ -226,6 +226,7 @@ public class ZkController {
private volatile boolean isClosed;
+ @Deprecated
// keeps track of replicas that have been asked to recover by leaders running on this node
private final Map<String, String> replicasInLeaderInitiatedRecovery = new HashMap<String, String>();
@@ -323,7 +324,7 @@ public class ZkController {
@Override
public void command() {
log.info("ZooKeeper session re-connected ... refreshing core states after session expiration.");
-
+ clearZkCollectionTerms();
try {
zkStateReader.createClusterStateWatchersAndUpdate();
@@ -435,7 +436,6 @@ public class ZkController {
this.overseerRunningMap = Overseer.getRunningMap(zkClient);
this.overseerCompletedMap = Overseer.getCompletedMap(zkClient);
this.overseerFailureMap = Overseer.getFailureMap(zkClient);
- cmdExecutor = new ZkCmdExecutor(clientTimeout);
zkStateReader = new ZkStateReader(zkClient, () -> {
if (cc != null) cc.securityNodeChanged();
});
@@ -547,6 +547,9 @@ public class ZkController {
*/
public void close() {
this.isClosed = true;
+ synchronized (collectionToTerms) {
+ collectionToTerms.values().forEach(ZkCollectionTerms::close);
+ }
try {
for (ElectionContext context : electionContexts.values()) {
try {
@@ -1034,7 +1037,14 @@ public class ZkController {
final String coreZkNodeName = desc.getCloudDescriptor().getCoreNodeName();
assert coreZkNodeName != null : "we should have a coreNodeName by now";
-
+
+ ZkShardTerms shardTerms = getShardTerms(collection, cloudDesc.getShardId());
+
+ // This flag is used for testing rolling updates and should be removed in SOLR-11812
+ boolean isRunningInNewLIR = "new".equals(desc.getCoreProperty("lirVersion", "new"));
+ if (isRunningInNewLIR) {
+ shardTerms.registerTerm(coreZkNodeName);
+ }
String shardId = cloudDesc.getShardId();
Map<String,Object> props = new HashMap<>();
// we only put a subset of props into the leader node
@@ -1118,15 +1128,17 @@ public class ZkController {
}
}
boolean didRecovery
- = checkRecovery(recoverReloadedCores, isLeader, skipRecovery, collection, coreZkNodeName, core, cc, afterExpiration);
+ = checkRecovery(recoverReloadedCores, isLeader, skipRecovery, collection, coreZkNodeName, shardId, core, cc, afterExpiration);
if (!didRecovery) {
if (isTlogReplicaAndNotLeader) {
startReplicationFromLeader(coreName, true);
}
publish(desc, Replica.State.ACTIVE);
}
-
-
+
+ if (isRunningInNewLIR && replicaType != Type.PULL) {
+ shardTerms.addListener(new RecoveringCoreTermWatcher(core));
+ }
core.getCoreDescriptor().getCloudDescriptor().setHasRegistered(true);
}
@@ -1295,7 +1307,7 @@ public class ZkController {
* Returns whether or not a recovery was started
*/
private boolean checkRecovery(boolean recoverReloadedCores, final boolean isLeader, boolean skipRecovery,
- final String collection, String shardId,
+ final String collection, String coreZkNodeName, String shardId,
SolrCore core, CoreContainer cc, boolean afterExpiration) {
if (SKIP_AUTO_RECOVERY) {
log.warn("Skipping recovery according to sys prop solrcloud.skip.autorecovery");
@@ -1322,6 +1334,13 @@ public class ZkController {
core.getUpdateHandler().getSolrCoreState().doRecovery(cc, core.getCoreDescriptor());
return true;
}
+
+ ZkShardTerms zkShardTerms = getShardTerms(collection, shardId);
+ if (zkShardTerms.registered(coreZkNodeName) && !zkShardTerms.canBecomeLeader(coreZkNodeName)) {
+ log.info("Leader's term larger than core " + core.getName() + "; starting recovery process");
+ core.getUpdateHandler().getSolrCoreState().doRecovery(cc, core.getCoreDescriptor());
+ return true;
+ }
} else {
log.info("I am the leader, no recovery necessary");
}
@@ -1372,6 +1391,7 @@ public class ZkController {
String shardId = cd.getCloudDescriptor().getShardId();
String coreNodeName = cd.getCloudDescriptor().getCoreNodeName();
+
// If the leader initiated recovery, then verify that this replica has performed
// recovery as requested before becoming active; don't even look at lirState if going down
if (state != Replica.State.DOWN) {
@@ -1394,7 +1414,7 @@ public class ZkController {
}
}
}
-
+
Map<String,Object> props = new HashMap<>();
props.put(Overseer.QUEUE_OPERATION, "state");
props.put(ZkStateReader.STATE_PROP, state.toString());
@@ -1430,6 +1450,11 @@ public class ZkController {
log.info("The core '{}' had failed to initialize before.", cd.getName());
}
+ // This flag is used for testing rolling updates and should be removed in SOLR-11812
+ boolean isRunningInNewLIR = "new".equals(cd.getCoreProperty("lirVersion", "new"));
+ if (state == Replica.State.RECOVERING && isRunningInNewLIR) {
+ getShardTerms(collection, shardId).setEqualsToMax(coreNodeName);
+ }
ZkNodeProps m = new ZkNodeProps(props);
if (updateLastState) {
@@ -1441,23 +1466,28 @@ public class ZkController {
}
}
- private boolean needsToBeAssignedShardId(final CoreDescriptor desc,
- final ClusterState state, final String coreNodeName) {
-
- final CloudDescriptor cloudDesc = desc.getCloudDescriptor();
+ public ZkShardTerms getShardTerms(String collection, String shardId) {
+ return getCollectionTerms(collection).getShard(shardId);
+ }
- final String shardId = state.getShardId(getNodeName(), desc.getName());
+ private ZkCollectionTerms getCollectionTerms(String collection) {
+ synchronized (collectionToTerms) {
+ if (!collectionToTerms.containsKey(collection)) collectionToTerms.put(collection, new ZkCollectionTerms(collection, zkClient));
+ return collectionToTerms.get(collection);
+ }
+ }
- if (shardId != null) {
- cloudDesc.setShardId(shardId);
- return false;
+ public void clearZkCollectionTerms() {
+ synchronized (collectionToTerms) {
+ collectionToTerms.values().forEach(ZkCollectionTerms::close);
+ collectionToTerms.clear();
}
- return true;
}
public void unregister(String coreName, CoreDescriptor cd) throws Exception {
final String coreNodeName = cd.getCloudDescriptor().getCoreNodeName();
final String collection = cd.getCloudDescriptor().getCollectionName();
+ getCollectionTerms(collection).remove(cd.getCloudDescriptor().getShardId(), cd);
if (Strings.isNullOrEmpty(collection)) {
log.error("No collection was specified.");
@@ -1733,7 +1763,7 @@ public class ZkController {
boolean isLeader = leaderProps.getCoreUrl().equals(ourUrl);
if (!isLeader && !SKIP_AUTO_RECOVERY) {
- // detect if this core is in leader-initiated recovery and if so,
+ // detect if this core is in leader-initiated recovery and if so,
// then we don't need the leader to wait on seeing the down state
Replica.State lirState = null;
try {
@@ -1743,9 +1773,9 @@ public class ZkController {
" is in leader-initiated recovery due to: " + exc, exc);
}
- if (lirState != null) {
- log.debug("Replica " + myCoreNodeName +
- " is already in leader-initiated recovery, so not waiting for leader to see down state.");
+ if (lirState != null || !getShardTerms(collection, shard).canBecomeLeader(myCoreNodeName)) {
+ log.debug("Term of replica " + myCoreNodeName +
+ " is already less than leader, so not waiting for leader to see down state.");
} else {
log.info("Replica " + myCoreNodeName +
@@ -2055,6 +2085,7 @@ public class ZkController {
* false means the node is not live either, so no point in trying to send recovery commands
* to it.
*/
+ @Deprecated
public boolean ensureReplicaInLeaderInitiatedRecovery(
final CoreContainer container,
final String collection, final String shardId, final ZkCoreNodeProps replicaCoreProps,
@@ -2117,13 +2148,14 @@ public class ZkController {
" is not live, so skipping leader-initiated recovery for replica: core={} coreNodeName={}",
replicaCoreProps.getCoreName(), replicaCoreNodeName);
// publishDownState will be false to avoid publishing the "down" state too many times
- // as many errors can occur together and will each call into this method (SOLR-6189)
+ // as many errors can occur together and will each call into this method (SOLR-6189)
}
}
return nodeIsLive;
}
+ @Deprecated
public boolean isReplicaInRecoveryHandling(String replicaUrl) {
boolean exists = false;
synchronized (replicasInLeaderInitiatedRecovery) {
@@ -2132,12 +2164,14 @@ public class ZkController {
return exists;
}
+ @Deprecated
public void removeReplicaFromLeaderInitiatedRecoveryHandling(String replicaUrl) {
synchronized (replicasInLeaderInitiatedRecovery) {
replicasInLeaderInitiatedRecovery.remove(replicaUrl);
}
}
+ @Deprecated
public Replica.State getLeaderInitiatedRecoveryState(String collection, String shardId, String coreNodeName) {
final Map<String, Object> stateObj = getLeaderInitiatedRecoveryStateObject(collection, shardId, coreNodeName);
if (stateObj == null) {
@@ -2147,6 +2181,7 @@ public class ZkController {
return stateStr == null ? null : Replica.State.getState(stateStr);
}
+ @Deprecated
public Map<String, Object> getLeaderInitiatedRecoveryStateObject(String collection, String shardId, String coreNodeName) {
if (collection == null || shardId == null || coreNodeName == null)
@@ -2191,6 +2226,7 @@ public class ZkController {
return stateObj;
}
+ @Deprecated
public void updateLeaderInitiatedRecoveryState(String collection, String shardId, String coreNodeName,
Replica.State state, CoreDescriptor leaderCd, boolean retryOnConnLoss) {
if (collection == null || shardId == null || coreNodeName == null) {
@@ -2199,12 +2235,12 @@ public class ZkController {
+ "; shardId=" + shardId + "; coreNodeName=" + coreNodeName);
return; // if we don't have complete data about a core in cloud mode, do nothing
}
-
+
assert leaderCd != null;
assert leaderCd.getCloudDescriptor() != null;
String leaderCoreNodeName = leaderCd.getCloudDescriptor().getCoreNodeName();
-
+
String znodePath = getLeaderInitiatedRecoveryZnodePath(collection, shardId, coreNodeName);
if (state == Replica.State.ACTIVE) {
@@ -2269,29 +2305,29 @@ public class ZkController {
private void markShardAsDownIfLeader(String collection, String shardId, CoreDescriptor leaderCd,
String znodePath, byte[] znodeData,
boolean retryOnConnLoss) throws KeeperException, InterruptedException {
-
+
if (!leaderCd.getCloudDescriptor().isLeader()) {
log.info("No longer leader, aborting attempt to mark shard down as part of LIR");
throw new NotLeaderException(ErrorCode.SERVER_ERROR, "Locally, we do not think we are the leader.");
}
-
+
ContextKey key = new ContextKey(collection, leaderCd.getCloudDescriptor().getCoreNodeName());
ElectionContext context = electionContexts.get(key);
-
+
// we make sure we locally think we are the leader before and after getting the context - then
// we only try zk if we still think we are the leader and have our leader context
if (context == null || !leaderCd.getCloudDescriptor().isLeader()) {
log.info("No longer leader, aborting attempt to mark shard down as part of LIR");
throw new NotLeaderException(ErrorCode.SERVER_ERROR, "Locally, we do not think we are the leader.");
}
-
+
// we think we are the leader - get the expected shard leader version
// we use this version and multi to ensure *only* the current zk registered leader
// for a shard can put a replica into LIR
-
+
Integer leaderZkNodeParentVersion = ((ShardLeaderElectionContextBase)context).getLeaderZkNodeParentVersion();
-
+
// TODO: should we do this optimistically to avoid races?
if (zkClient.exists(znodePath, retryOnConnLoss)) {
List<Op> ops = new ArrayList<>(2);
@@ -2306,7 +2342,7 @@ public class ZkController {
} catch (KeeperException.NodeExistsException nee) {
// if it exists, that's great!
}
-
+
// we only create the entry if the context we are using is registered as the current leader in ZK
List<Op> ops = new ArrayList<>(2);
ops.add(Op.check(new org.apache.hadoop.fs.Path(((ShardLeaderElectionContextBase)context).leaderPath).getParent().toString(), leaderZkNodeParentVersion));
@@ -2316,11 +2352,13 @@ public class ZkController {
}
}
- public String getLeaderInitiatedRecoveryZnodePath(String collection, String shardId) {
+ @Deprecated
+ public static String getLeaderInitiatedRecoveryZnodePath(String collection, String shardId) {
return "/collections/" + collection + "/leader_initiated_recovery/" + shardId;
}
- public String getLeaderInitiatedRecoveryZnodePath(String collection, String shardId, String coreNodeName) {
+ @Deprecated
+ public static String getLeaderInitiatedRecoveryZnodePath(String collection, String shardId, String coreNodeName) {
return getLeaderInitiatedRecoveryZnodePath(collection, shardId) + "/" + coreNodeName;
}
@@ -2608,12 +2646,6 @@ public class ZkController {
};
}
- public String getLeaderSeqPath(String collection, String coreNodeName) {
- ContextKey key = new ContextKey(collection, coreNodeName);
- ElectionContext context = electionContexts.get(key);
- return context != null ? context.leaderSeqPath : null;
- }
-
/**
* Thrown during leader initiated recovery process if current node is not leader
*/
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/27ef6530/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
new file mode 100644
index 0000000..7dc0d57
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
@@ -0,0 +1,475 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud;
+
+import java.lang.invoke.MethodHandles;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.ObjectReleaseTracker;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.core.CoreDescriptor;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class used for interact with a ZK term node.
+ * Each ZK term node relates to a shard of a collection and have this format (in json)
+ * <p>
+ * <code>
+ * {
+ * "replicaNodeName1" : 1,
+ * "replicaNodeName2" : 2,
+ * ..
+ * }
+ * </code>
+ * <p>
+ * The values correspond to replicas are called terms.
+ * Only replicas with highest term value are considered up to date and be able to become leader and serve queries.
+ * <p>
+ * Terms can only updated in two strict ways:
+ * <ul>
+ * <li>A replica sets its term equals to leader's term
+ * <li>The leader increase its term and some other replicas by 1
+ * </ul>
+ * This class should not be reused after {@link org.apache.zookeeper.Watcher.Event.KeeperState#Expired} event
+ */
+public class ZkShardTerms implements AutoCloseable{
+
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private final Object writingLock = new Object();
+ private final String collection;
+ private final String shard;
+ private final String znodePath;
+ private final SolrZkClient zkClient;
+ private final Set<CoreTermWatcher> listeners = new HashSet<>();
+ private final AtomicBoolean isClosed = new AtomicBoolean(false);
+
+ private Terms terms;
+
+ // Listener of a core for shard's term change events
+ interface CoreTermWatcher {
+ // return true if the listener wanna to be triggered in the next time
+ boolean onTermChanged(Terms terms);
+ }
+
+ public ZkShardTerms(String collection, String shard, SolrZkClient zkClient) {
+ this.znodePath = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection + "/terms/" + shard;
+ this.collection = collection;
+ this.shard = shard;
+ this.zkClient = zkClient;
+ ensureTermNodeExist();
+ refreshTerms();
+ retryRegisterWatcher();
+ ObjectReleaseTracker.track(this);
+ }
+
+ /**
+ * Ensure that leader's term is higher than some replica's terms
+ * @param leader coreNodeName of leader
+ * @param replicasNeedingRecovery set of replicas in which their terms should be lower than leader's term
+ */
+ public void ensureTermsIsHigher(String leader, Set<String> replicasNeedingRecovery) {
+ Terms newTerms;
+ while( (newTerms = terms.increaseTerms(leader, replicasNeedingRecovery)) != null) {
+ if (forceSaveTerms(newTerms)) return;
+ }
+ }
+
+ /**
+ * Can this replica become leader or is this replica's term equals to leader's term?
+ * @param coreNodeName of the replica
+ * @return true if this replica can become leader, false if otherwise
+ */
+ public boolean canBecomeLeader(String coreNodeName) {
+ return terms.canBecomeLeader(coreNodeName);
+ }
+
+ /**
+ * Did this replica registered its term? This is a sign to check f
+ * @param coreNodeName of the replica
+ * @return true if this replica registered its term, false if otherwise
+ */
+ public boolean registered(String coreNodeName) {
+ return terms.getTerm(coreNodeName) != null;
+ }
+
+ public void close() {
+ // no watcher will be registered
+ isClosed.set(true);
+ synchronized (listeners) {
+ listeners.clear();
+ }
+ ObjectReleaseTracker.release(this);
+ }
+
+ // package private for testing, only used by tests
+ Map<String, Long> getTerms() {
+ synchronized (writingLock) {
+ return new HashMap<>(terms.values);
+ }
+ }
+
+ /**
+ * Add a listener so the next time the shard's term get updated, listeners will be called
+ */
+ void addListener(CoreTermWatcher listener) {
+ synchronized (listeners) {
+ listeners.add(listener);
+ }
+ }
+
+ /**
+ * Remove the coreNodeName from terms map and also remove any expired listeners
+ * @return Return true if this object should not be reused
+ */
+ boolean removeTerm(CoreDescriptor cd) {
+ int numListeners;
+ synchronized (listeners) {
+ // solrcore already closed
+ listeners.removeIf(coreTermWatcher -> !coreTermWatcher.onTermChanged(terms));
+ numListeners = listeners.size();
+ }
+ Terms newTerms;
+ while ( (newTerms = terms.removeTerm(cd.getCloudDescriptor().getCoreNodeName())) != null) {
+ try {
+ if (saveTerms(newTerms)) return numListeners == 0;
+ } catch (KeeperException.NoNodeException e) {
+ return true;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Register a replica's term (term value will be 0).
+ * If a term is already associate with this replica do nothing
+ * @param coreNodeName of the replica
+ */
+ void registerTerm(String coreNodeName) {
+ Terms newTerms;
+ while ( (newTerms = terms.registerTerm(coreNodeName)) != null) {
+ if (forceSaveTerms(newTerms)) break;
+ }
+ }
+
+ /**
+ * Set a replica's term equals to leader's term
+ * @param coreNodeName of the replica
+ */
+ public void setEqualsToMax(String coreNodeName) {
+ Terms newTerms;
+ while ( (newTerms = terms.setEqualsToMax(coreNodeName)) != null) {
+ if (forceSaveTerms(newTerms)) break;
+ }
+ }
+
+ public long getTerm(String coreNodeName) {
+ Long term = terms.getTerm(coreNodeName);
+ return term == null? -1 : term;
+ }
+
+ // package private for testing, only used by tests
+ int getNumListeners() {
+ synchronized (listeners) {
+ return listeners.size();
+ }
+ }
+
+ /**
+ * Set new terms to ZK.
+ * In case of correspond ZK term node is not created, create it
+ * @param newTerms to be set
+ * @return true if terms is saved successfully to ZK, false if otherwise
+ */
+ private boolean forceSaveTerms(Terms newTerms) {
+ try {
+ return saveTerms(newTerms);
+ } catch (KeeperException.NoNodeException e) {
+ ensureTermNodeExist();
+ return false;
+ }
+ }
+
+ /**
+ * Set new terms to ZK, the version of new terms must match the current ZK term node
+ * @param newTerms to be set
+ * @return true if terms is saved successfully to ZK, false if otherwise
+ * @throws KeeperException.NoNodeException correspond ZK term node is not created
+ */
+ private boolean saveTerms(Terms newTerms) throws KeeperException.NoNodeException {
+ byte[] znodeData = Utils.toJSON(newTerms.values);
+ try {
+ Stat stat = zkClient.setData(znodePath, znodeData, newTerms.version, true);
+ setNewTerms(new Terms(newTerms.values, stat.getVersion()));
+ return true;
+ } catch (KeeperException.BadVersionException e) {
+ log.info("Failed to save terms, version is not match, retrying");
+ refreshTerms();
+ } catch (KeeperException.NoNodeException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error save shard term for collection:" + collection, e);
+ }
+ return false;
+ }
+
+ /**
+ * Create correspond ZK term node
+ */
+ private void ensureTermNodeExist() {
+ String path = "/collections/"+collection+ "/terms";
+ try {
+ if (!zkClient.exists(path, true)) {
+ try {
+ zkClient.makePath(path, true);
+ } catch (KeeperException.NodeExistsException e) {
+ // it's okay if another beats us creating the node
+ }
+ }
+ path += "/"+shard;
+ if (!zkClient.exists(path, true)) {
+ try {
+ Map<String, Long> initialTerms = new HashMap<>();
+ zkClient.create(path, Utils.toJSON(initialTerms), CreateMode.PERSISTENT, true);
+ } catch (KeeperException.NodeExistsException e) {
+ // it's okay if another beats us creating the node
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.interrupted();
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error creating shard term node in Zookeeper for collection:" + collection, e);
+ } catch (KeeperException e) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error creating shard term node in Zookeeper for collection:" + collection, e);
+ }
+ }
+
+ /**
+ * Fetch latest terms from ZK
+ */
+ public void refreshTerms() {
+ Terms newTerms;
+ try {
+ Stat stat = new Stat();
+ byte[] data = zkClient.getData(znodePath, null, stat, true);
+ newTerms = new Terms((Map<String, Long>) Utils.fromJSON(data), stat.getVersion());
+ } catch (KeeperException e) {
+ Thread.interrupted();
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error updating shard term for collection:" + collection, e);
+ } catch (InterruptedException e) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error updating shard term for collection:" + collection, e);
+ }
+
+ setNewTerms(newTerms);
+ }
+
+ /**
+ * Retry register a watcher to the correspond ZK term node
+ */
+ private void retryRegisterWatcher() {
+ while (!isClosed.get()) {
+ try {
+ registerWatcher();
+ return;
+ } catch (KeeperException.SessionExpiredException | KeeperException.AuthFailedException e) {
+ isClosed.set(true);
+ log.error("Failed watching shard term for collection: {} due to unrecoverable exception", collection, e);
+ return;
+ } catch (KeeperException e) {
+ log.warn("Failed watching shard term for collection:{}, retrying!", collection, e);
+ try {
+ zkClient.getConnectionManager().waitForConnected(zkClient.getZkClientTimeout());
+ } catch (TimeoutException te) {
+ if (Thread.interrupted()) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error watching shard term for collection:" + collection, te);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Register a watcher to the correspond ZK term node
+ */
+ private void registerWatcher() throws KeeperException {
+ Watcher watcher = event -> {
+ // session events are not change events, and do not remove the watcher
+ if (Watcher.Event.EventType.None == event.getType()) {
+ return;
+ }
+ retryRegisterWatcher();
+ // Some events may be missed during register a watcher, so it is safer to refresh terms after registering watcher
+ refreshTerms();
+ };
+ try {
+ // exists operation is faster than getData operation
+ zkClient.exists(znodePath, watcher, true);
+ } catch (InterruptedException e) {
+ Thread.interrupted();
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error watching shard term for collection:" + collection, e);
+ }
+ }
+
+
+ /**
+ * Atomically update {@link ZkShardTerms#terms} and call listeners
+ * @param newTerms to be set
+ */
+ private void setNewTerms(Terms newTerms) {
+ boolean isChanged = false;
+ synchronized (writingLock) {
+ if (terms == null || newTerms.version > terms.version) {
+ terms = newTerms;
+ isChanged = true;
+ }
+ }
+ if (isChanged) onTermUpdates(newTerms);
+ }
+
+ private void onTermUpdates(Terms newTerms) {
+ synchronized (listeners) {
+ listeners.removeIf(coreTermWatcher -> !coreTermWatcher.onTermChanged(newTerms));
+ }
+ }
+
+ /**
+ * Hold values of terms, this class is immutable
+ */
+ static class Terms {
+ private final Map<String, Long> values;
+ // ZK node version
+ private final int version;
+
+ public Terms () {
+ this(new HashMap<>(), 0);
+ }
+
+ public Terms(Map<String, Long> values, int version) {
+ this.values = values;
+ this.version = version;
+ }
+
+ /**
+ * Can this replica become leader or is this replica's term equals to leader's term?
+ * @param coreNodeName of the replica
+ * @return true if this replica can become leader, false if otherwise
+ */
+ boolean canBecomeLeader(String coreNodeName) {
+ if (values.isEmpty()) return true;
+ long maxTerm = Collections.max(values.values());
+ return values.getOrDefault(coreNodeName, 0L) == maxTerm;
+ }
+
+ Long getTerm(String coreNodeName) {
+ return values.get(coreNodeName);
+ }
+
+ /**
+ * Return a new {@link Terms} in which term of {@code leader} is higher than {@code replicasNeedingRecovery}
+ * @param leader coreNodeName of leader
+ * @param replicasNeedingRecovery set of replicas in which their terms should be lower than leader's term
+ * @return null if term of {@code leader} is already higher than {@code replicasNeedingRecovery}
+ */
+ Terms increaseTerms(String leader, Set<String> replicasNeedingRecovery) {
+ if (!values.containsKey(leader)) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Can not find leader's term " + leader);
+ }
+
+ boolean changed = false;
+ boolean foundReplicasInLowerTerms = false;
+
+ HashMap<String, Long> newValues = new HashMap<>(values);
+ long leaderTerm = newValues.get(leader);
+ for (String replica : newValues.keySet()) {
+ if (replicasNeedingRecovery.contains(replica)) foundReplicasInLowerTerms = true;
+ if (Objects.equals(newValues.get(replica), leaderTerm)) {
+ if(replicasNeedingRecovery.contains(replica)) {
+ changed = true;
+ } else {
+ newValues.put(replica, leaderTerm+1);
+ }
+ }
+ }
+
+ // We should skip the optimization if there are no replicasNeedingRecovery present in local terms,
+ // this may indicate that the current value is stale
+ if (!changed && foundReplicasInLowerTerms) return null;
+ return new Terms(newValues, version);
+ }
+
+ /**
+ * Return a new {@link Terms} in which term of {@code coreNodeName} is removed
+ * @param coreNodeName of the replica
+ * @return null if term of {@code coreNodeName} is already not exist
+ */
+ Terms removeTerm(String coreNodeName) {
+ if (!values.containsKey(coreNodeName)) return null;
+
+ HashMap<String, Long> newValues = new HashMap<>(values);
+ newValues.remove(coreNodeName);
+ return new Terms(newValues, version);
+ }
+
+ /**
+ * Return a new {@link Terms} in which the associate term of {@code coreNodeName} is not null
+ * @param coreNodeName of the replica
+ * @return null if term of {@code coreNodeName} is already exist
+ */
+ Terms registerTerm(String coreNodeName) {
+ if (values.containsKey(coreNodeName)) return null;
+
+ HashMap<String, Long> newValues = new HashMap<>(values);
+ newValues.put(coreNodeName, 0L);
+ return new Terms(newValues, version);
+ }
+
+ /**
+ * Return a new {@link Terms} in which the term of {@code coreNodeName} is max
+ * @param coreNodeName of the replica
+ * @return null if term of {@code coreNodeName} is already maximum
+ */
+ Terms setEqualsToMax(String coreNodeName) {
+ long maxTerm;
+ try {
+ maxTerm = Collections.max(values.values());
+ } catch (NoSuchElementException e){
+ maxTerm = 0;
+ }
+ if (values.get(coreNodeName) == maxTerm) return null;
+
+ HashMap<String, Long> newValues = new HashMap<>(values);
+ newValues.put(coreNodeName, maxTerm);
+ return new Terms(newValues, version);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/27ef6530/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
index 4c6ce47..428ad83 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
@@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.solr.client.solrj.cloud.autoscaling.AlreadyExistsException;
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
+import org.apache.solr.client.solrj.cloud.autoscaling.BadVersionException;
import org.apache.solr.client.solrj.cloud.autoscaling.DistribStateManager;
import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
@@ -392,7 +393,22 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
public static void createCollectionZkNode(DistribStateManager stateManager, String collection, Map<String,String> params) {
log.debug("Check for collection zkNode:" + collection);
String collectionPath = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection;
-
+ // clean up old terms node
+ String termsPath = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection + "/terms";
+ try {
+ if (stateManager.hasData(termsPath)) {
+ List<String> paths = stateManager.listData(termsPath);
+ for (String path : paths) {
+ stateManager.removeData(termsPath + "/" + path, -1);
+ }
+ stateManager.removeData(termsPath, -1);
+ }
+ } catch (InterruptedException e) {
+ Thread.interrupted();
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Error deleting old term nodes for collection from Zookeeper", e);
+ } catch (KeeperException | IOException | BadVersionException e) {
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Error deleting old term nodes for collection from Zookeeper", e);
+ }
try {
if (!stateManager.hasData(collectionPath)) {
log.debug("Creating collection in ZooKeeper:" + collection);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/27ef6530/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
index cebb2d0..dcc3de6 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
@@ -28,8 +28,10 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
+import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import java.util.function.BiConsumer;
import com.google.common.collect.ImmutableSet;
@@ -47,6 +49,7 @@ import org.apache.solr.cloud.OverseerSolrResponse;
import org.apache.solr.cloud.OverseerTaskQueue;
import org.apache.solr.cloud.OverseerTaskQueue.QueueEvent;
import org.apache.solr.cloud.ZkController;
+import org.apache.solr.cloud.ZkShardTerms;
import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler;
import org.apache.solr.cloud.overseer.SliceMutator;
import org.apache.solr.cloud.rule.ReplicaAssigner;
@@ -1067,7 +1070,8 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
}
private static void forceLeaderElection(SolrQueryRequest req, CollectionsHandler handler) {
- ClusterState clusterState = handler.coreContainer.getZkController().getClusterState();
+ ZkController zkController = handler.coreContainer.getZkController();
+ ClusterState clusterState = zkController.getClusterState();
String collectionName = req.getParams().required().get(COLLECTION_PROP);
String sliceId = req.getParams().required().get(SHARD_ID_PROP);
@@ -1079,7 +1083,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
"No shard with name " + sliceId + " exists for collection " + collectionName);
}
- try {
+ try (ZkShardTerms zkShardTerms = new ZkShardTerms(collectionName, slice.getName(), zkController.getZkClient())) {
// if an active replica is the leader, then all is fine already
Replica leader = slice.getLeader();
if (leader != null && leader.getState() == State.ACTIVE) {
@@ -1096,20 +1100,37 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
handler.coreContainer.getZkController().getZkClient().clean(lirPath);
}
+ final Set<String> liveNodes = clusterState.getLiveNodes();
+ List<Replica> liveReplicas = slice.getReplicas().stream()
+ .filter(rep -> liveNodes.contains(rep.getNodeName())).collect(Collectors.toList());
+ boolean shouldIncreaseReplicaTerms = liveReplicas.stream()
+ .noneMatch(rep -> zkShardTerms.registered(rep.getName()) && zkShardTerms.canBecomeLeader(rep.getName()));
+ // we won't increase replica's terms if exist a live replica with term equals to leader
+ if (shouldIncreaseReplicaTerms) {
+ OptionalLong optionalMaxTerm = liveReplicas.stream()
+ .filter(rep -> zkShardTerms.registered(rep.getName()))
+ .mapToLong(rep -> zkShardTerms.getTerm(rep.getName()))
+ .max();
+ // increase terms of replicas less out-of-sync
+ if (optionalMaxTerm.isPresent()) {
+ liveReplicas.stream()
+ .filter(rep -> zkShardTerms.getTerm(rep.getName()) == optionalMaxTerm.getAsLong())
+ .forEach(rep -> zkShardTerms.setEqualsToMax(rep.getName()));
+ }
+ }
+
// Call all live replicas to prepare themselves for leadership, e.g. set last published
// state to active.
- for (Replica rep : slice.getReplicas()) {
- if (clusterState.getLiveNodes().contains(rep.getNodeName())) {
- ShardHandler shardHandler = handler.coreContainer.getShardHandlerFactory().getShardHandler();
+ for (Replica rep : liveReplicas) {
+ ShardHandler shardHandler = handler.coreContainer.getShardHandlerFactory().getShardHandler();
- ModifiableSolrParams params = new ModifiableSolrParams();
- params.set(CoreAdminParams.ACTION, CoreAdminAction.FORCEPREPAREFORLEADERSHIP.toString());
- params.set(CoreAdminParams.CORE, rep.getStr("core"));
- String nodeName = rep.getNodeName();
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.set(CoreAdminParams.ACTION, CoreAdminAction.FORCEPREPAREFORLEADERSHIP.toString());
+ params.set(CoreAdminParams.CORE, rep.getStr("core"));
+ String nodeName = rep.getNodeName();
- OverseerCollectionMessageHandler.sendShardRequest(nodeName, params, shardHandler, null, null,
- CommonParams.CORES_HANDLER_PATH, handler.coreContainer.getZkController().getZkStateReader()); // synchronous request
- }
+ OverseerCollectionMessageHandler.sendShardRequest(nodeName, params, shardHandler, null, null,
+ CommonParams.CORES_HANDLER_PATH, handler.coreContainer.getZkController().getZkStateReader()); // synchronous request
}
// Wait till we have an active leader
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/27ef6530/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java b/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
index 0a6d5ce..3647735 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
@@ -21,6 +21,7 @@ import java.lang.invoke.MethodHandles;
import java.util.Objects;
import org.apache.solr.cloud.CloudDescriptor;
+import org.apache.solr.cloud.ZkShardTerms;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
@@ -124,6 +125,12 @@ class PrepRecoveryOp implements CoreAdminHandler.CoreAdminOp {
log.warn("Leader " + core.getName() + " ignoring request to be in the recovering state because it is live and active.");
}
+ ZkShardTerms shardTerms = coreContainer.getZkController().getShardTerms(collectionName, slice.getName());
+ // if the replica is waiting for leader to see recovery state, the leader should refresh its terms
+ if (waitForState == Replica.State.RECOVERING && shardTerms.registered(coreNodeName) && !shardTerms.canBecomeLeader(coreNodeName)) {
+ shardTerms.refreshTerms();
+ }
+
boolean onlyIfActiveCheckResult = onlyIfLeaderActive != null && onlyIfLeaderActive && localState != Replica.State.ACTIVE;
log.info("In WaitForState(" + waitForState + "): collection=" + collectionName + ", shard=" + slice.getName() +
", thisCore=" + core.getName() + ", leaderDoesNotNeedRecovery=" + leaderDoesNotNeedRecovery +
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/27ef6530/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java b/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java
index b418a19..739604f 100644
--- a/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java
+++ b/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java
@@ -308,19 +308,20 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
// after the current one, and if there is, bail
boolean locked = recoveryLock.tryLock();
try {
- if (!locked) {
- if (recoveryWaiting.get() > 0) {
- return;
- }
- recoveryWaiting.incrementAndGet();
- } else {
- recoveryWaiting.incrementAndGet();
- cancelRecovery();
+ if (!locked && recoveryWaiting.get() > 0) {
+ return;
}
+
+ recoveryWaiting.incrementAndGet();
+ cancelRecovery();
recoveryLock.lock();
try {
- recoveryWaiting.decrementAndGet();
+ // don't use recoveryLock.getQueueLength() for this
+ if (recoveryWaiting.decrementAndGet() > 0) {
+ // another recovery waiting behind us, let it run now instead of after we finish
+ return;
+ }
// to be air tight we must also check after lock
if (cc.isShutDown()) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/27ef6530/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
index de031a2..3cff171 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -44,6 +45,7 @@ import org.apache.solr.client.solrj.response.SimpleSolrResponse;
import org.apache.solr.cloud.CloudDescriptor;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.ZkController;
+import org.apache.solr.cloud.ZkShardTerms;
import org.apache.solr.cloud.overseer.OverseerAction;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
@@ -184,6 +186,10 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
private final boolean cloneRequiredOnLeader;
private final Replica.Type replicaType;
+ @Deprecated
+ // this flag, used for testing rolling updates, should be removed by SOLR-11812
+ private final boolean isOldLIRMode;
+
public DistributedUpdateProcessor(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) {
this(req, rsp, new AtomicUpdateDocumentMerger(req), next);
}
@@ -202,6 +208,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
this.ulog = req.getCore().getUpdateHandler().getUpdateLog();
this.vinfo = ulog == null ? null : ulog.getVersionInfo();
+ this.isOldLIRMode = !"new".equals(req.getCore().getCoreDescriptor().getCoreProperty("lirVersion", "new"));
versionsStored = this.vinfo != null && this.vinfo.getVersionField() != null;
returnVersions = req.getParams().getBool(UpdateParams.VERSIONS ,false);
@@ -343,13 +350,13 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
}
List<Node> nodes = new ArrayList<>(replicaProps.size());
+ ZkShardTerms zkShardTerms = zkController.getShardTerms(collection, shardId);
for (ZkCoreNodeProps props : replicaProps) {
- if (skipList != null) {
- boolean skip = skipListSet.contains(props.getCoreUrl());
- log.info("check url:" + props.getCoreUrl() + " against:" + skipListSet + " result:" + skip);
- if (!skip) {
- nodes.add(new StdNode(props, collection, shardId));
- }
+ String coreNodeName = ((Replica) props.getNodeProps()).getName();
+ if (skipList != null && skipListSet.contains(props.getCoreUrl())) {
+ log.info("check url:" + props.getCoreUrl() + " against:" + skipListSet + " result:true");
+ } else if(!isOldLIRMode && zkShardTerms.registered(coreNodeName) && !zkShardTerms.canBecomeLeader(coreNodeName)) {
+ log.info("skip url:{} cause its term is less than leader", props.getCoreUrl());
} else {
nodes.add(new StdNode(props, collection, shardId));
}
@@ -751,7 +758,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
// TODO - we may need to tell about more than one error...
List<Error> errorsForClient = new ArrayList<>(errors.size());
-
+ Map<ShardInfo, Set<String>> failedReplicas = new HashMap<>();
for (final SolrCmdDistributor.Error error : errors) {
if (error.req.node instanceof RetryNode) {
@@ -843,18 +850,27 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
&& foundErrorNodeInReplicaList // we found an error for one of replicas
&& !stdNode.getNodeProps().getCoreUrl().equals(leaderProps.getCoreUrl())) { // we do not want to put ourself into LIR
try {
+ String coreNodeName = ((Replica) stdNode.getNodeProps().getNodeProps()).getName();
// if false, then the node is probably not "live" anymore
// and we do not need to send a recovery message
Throwable rootCause = SolrException.getRootCause(error.e);
- log.error("Setting up to try to start recovery on replica {}", replicaUrl, rootCause);
- zkController.ensureReplicaInLeaderInitiatedRecovery(
- req.getCore().getCoreContainer(),
- collection,
- shardId,
- stdNode.getNodeProps(),
- req.getCore().getCoreDescriptor(),
- false /* forcePublishState */
- );
+ if (!isOldLIRMode && zkController.getShardTerms(collection, shardId).registered(coreNodeName)) {
+ log.error("Setting up to try to start recovery on replica {} with url {} by increasing leader term", coreNodeName, replicaUrl, rootCause);
+ ShardInfo shardInfo = new ShardInfo(collection, shardId, leaderCoreNodeName);
+ failedReplicas.putIfAbsent(shardInfo, new HashSet<>());
+ failedReplicas.get(shardInfo).add(coreNodeName);
+ } else {
+ // The replica did not registered its term, so it must run with old LIR implementation
+ log.error("Setting up to try to start recovery on replica {}", replicaUrl, rootCause);
+ zkController.ensureReplicaInLeaderInitiatedRecovery(
+ req.getCore().getCoreContainer(),
+ collection,
+ shardId,
+ stdNode.getNodeProps(),
+ req.getCore().getCoreDescriptor(),
+ false /* forcePublishState */
+ );
+ }
} catch (Exception exc) {
Throwable setLirZnodeFailedCause = SolrException.getRootCause(exc);
log.error("Leader failed to set replica " +
@@ -873,6 +889,12 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
}
}
}
+ if (!isOldLIRMode) {
+ for (Map.Entry<ShardInfo, Set<String>> entry : failedReplicas.entrySet()) {
+ ShardInfo shardInfo = entry.getKey();
+ zkController.getShardTerms(shardInfo.collection, shardInfo.shard).ensureTermsIsHigher(shardInfo.leader, entry.getValue());
+ }
+ }
// in either case, we need to attach the achieved and min rf to the response.
if (leaderReplicationTracker != null || rollupReplicationTracker != null) {
int achievedRf = Integer.MAX_VALUE;
@@ -905,6 +927,38 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
}
}
+ private class ShardInfo {
+ private String collection;
+ private String shard;
+ private String leader;
+
+ public ShardInfo(String collection, String shard, String leader) {
+ this.collection = collection;
+ this.shard = shard;
+ this.leader = leader;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ ShardInfo shardInfo = (ShardInfo) o;
+
+ if (!collection.equals(shardInfo.collection)) return false;
+ if (!shard.equals(shardInfo.shard)) return false;
+ return leader.equals(shardInfo.leader);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = collection.hashCode();
+ result = 31 * result + shard.hashCode();
+ result = 31 * result + leader.hashCode();
+ return result;
+ }
+ }
+
// must be synchronized by bucket
private void doLocalAdd(AddUpdateCommand cmd) throws IOException {