You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by no...@apache.org on 2021/03/30 11:16:12 UTC
[lucene-solr] 01/01: SOLR-15288: Fix the node restart bug
This is an automated email from the ASF dual-hosted git repository.
noble pushed a commit to branch jira/solr15288
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
commit 41157ca739143466b8dca4f280579e4731f51938
Author: Noble Paul <no...@gmail.com>
AuthorDate: Tue Mar 30 22:15:18 2021 +1100
SOLR-15288: Fix the node restart bug
---
.../src/java/org/apache/solr/cloud/Overseer.java | 2 +-
.../apache/solr/cloud/overseer/NodeMutator.java | 12 +++-
.../org/apache/solr/cloud/NodeMutatorTest.java | 2 +-
.../test/org/apache/solr/cloud/OverseerTest.java | 2 +-
.../client/solrj/impl/CloudSolrClientTest.java | 66 +++++++++++++++++++++-
5 files changed, 78 insertions(+), 6 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index 4ca5703..e7b35b1 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -456,7 +456,7 @@ public class Overseer implements SolrCloseable {
}
break;
case DOWNNODE:
- return new NodeMutator().downNode(clusterState, message);
+ return new NodeMutator(getSolrCloudManager()).downNode(clusterState, message);
default:
throw new RuntimeException("unknown operation:" + operation + " contents:" + message.getProperties());
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java
index e8db2b4..cf9eaf9 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java
@@ -24,19 +24,28 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.PerReplicaStates;
import org.apache.solr.common.cloud.PerReplicaStatesOps;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.solr.cloud.overseer.SliceMutator.getZkClient;
+
public class NodeMutator {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ protected SolrZkClient zkClient;
+ public NodeMutator(SolrCloudManager cloudManager) {
+ zkClient = getZkClient(cloudManager);
+ }
public List<ZkWriteCommand> downNode(ClusterState clusterState, ZkNodeProps message) {
List<ZkWriteCommand> zkWriteCommands = new ArrayList<>();
@@ -81,8 +90,9 @@ public class NodeMutator {
if (needToUpdateCollection) {
if (docCollection.isPerReplicaState()) {
+ PerReplicaStates fetch = PerReplicaStates.fetch(docCollection.getZNode(), zkClient, docCollection.getPerReplicaStates());
zkWriteCommands.add(new ZkWriteCommand(collection, docCollection.copyWithSlices(slicesCopy),
- PerReplicaStatesOps.downReplicas(downedReplicas, docCollection.getPerReplicaStates()), false));
+ PerReplicaStatesOps.downReplicas(downedReplicas, fetch), false));
} else {
zkWriteCommands.add(new ZkWriteCommand(collection, docCollection.copyWithSlices(slicesCopy)));
}
diff --git a/solr/core/src/test/org/apache/solr/cloud/NodeMutatorTest.java b/solr/core/src/test/org/apache/solr/cloud/NodeMutatorTest.java
index d49194d..b8279e3 100644
--- a/solr/core/src/test/org/apache/solr/cloud/NodeMutatorTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/NodeMutatorTest.java
@@ -43,7 +43,7 @@ public class NodeMutatorTest extends SolrTestCaseJ4Test {
@Test
public void downNodeReportsAllImpactedCollectionsAndNothingElse() throws IOException {
- NodeMutator nm = new NodeMutator();
+ NodeMutator nm = new NodeMutator(null);
//We use 2 nodes with maxShardsPerNode as 1
//Collection1: 2 shards X 1 replica = replica1 on node1 and replica2 on node2
diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
index eb0e7a5..f4b2559 100644
--- a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
@@ -520,7 +520,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
}
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.DOWNNODE.toLower(),
ZkStateReader.NODE_NAME_PROP, "127.0.0.1:8983_solr");
- List<ZkWriteCommand> commands = new NodeMutator().downNode(reader.getClusterState(), m);
+ List<ZkWriteCommand> commands = new NodeMutator(null).downNode(reader.getClusterState(), m);
ZkDistributedQueue q = overseers.get(0).getStateUpdateQueue();
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java
index b85499d..d3141b7 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java
@@ -32,6 +32,8 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeoutException;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -53,6 +55,7 @@ import org.apache.solr.client.solrj.response.RequestStatusState;
import org.apache.solr.client.solrj.response.SolrPingResponse;
import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.cloud.AbstractDistribZkTestBase;
+import org.apache.solr.cloud.MiniSolrCloudCluster;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrDocumentList;
@@ -91,7 +94,7 @@ import static org.apache.solr.client.solrj.SolrRequest.METHOD.POST;
* This test would be faster if we simulated the zk state instead.
*/
@Slow
-@LogLevel("org.apache.solr.cloud.Overseer=INFO;org.apache.solr.common.cloud=INFO;org.apache.solr.cloud.api.collections=INFO;org.apache.solr.cloud.overseer=INFO")
+@LogLevel("org.apache.solr.common.cloud.PerReplicaStatesOps=DEBUG;org.apache.solr.cloud.Overseer=INFO;org.apache.solr.common.cloud=INFO;org.apache.solr.cloud.api.collections=INFO;org.apache.solr.cloud.overseer=INFO")
public class CloudSolrClientTest extends SolrCloudTestCase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -1099,7 +1102,7 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
c.forEachReplica((s, replica) -> assertNotNull(replica.getReplicaState()));
PerReplicaStates prs = PerReplicaStates.fetch(ZkStateReader.getCollectionPath(testCollection), cluster.getZkClient(), null);
assertEquals(4, prs.states.size());
-
+ JettySolrRunner jsr = cluster.startJettySolrRunner();
// Now let's do an add replica
CollectionAdminRequest
.addReplicaToShard(testCollection, "shard1")
@@ -1120,4 +1123,63 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
assertEquals(4, prs.states.size());
}
+ public void testPRSRestart() throws Exception {
+ String testCollection = "prs_restart_test";
+ MiniSolrCloudCluster cluster1 =
+ configureCluster(1)
+ .addConfig("conf1", getFile("solrj").toPath().resolve("solr").resolve("configsets").resolve("streaming").resolve("conf"))
+ .withJettyConfig(jetty -> jetty.enableV2(true))
+ .configure();
+ try {
+ CollectionAdminRequest.createCollection(testCollection, "conf1", 1, 1)
+ .setPerReplicaState(Boolean.TRUE)
+ .process(cluster.getSolrClient());
+ cluster1.waitForActiveCollection(testCollection, 1, 1);
+
+ DocCollection c = cluster1.getSolrClient().getZkStateReader().getCollection(testCollection);
+ c.forEachReplica((s, replica) -> assertNotNull(replica.getReplicaState()));
+ String collectionPath = ZkStateReader.getCollectionPath(testCollection);
+ PerReplicaStates prs = PerReplicaStates.fetch(collectionPath, cluster.getZkClient(), null);
+ assertEquals(1, prs.states.size());
+
+ JettySolrRunner jsr = cluster1.startJettySolrRunner();
+ assertEquals(2,cluster1.getJettySolrRunners().size());
+
+ // Now let's do an add replica
+ CollectionAdminRequest
+ .addReplicaToShard(testCollection, "shard1")
+ .process(cluster1.getSolrClient());
+ cluster1.waitForActiveCollection(testCollection, 1, 2);
+ prs = PerReplicaStates.fetch(collectionPath, cluster.getZkClient(), null);
+ assertEquals(2, prs.states.size());
+ c = cluster1.getSolrClient().getZkStateReader().getCollection(testCollection);
+ prs.states.forEachEntry((s, state) -> assertEquals(Replica.State.ACTIVE, state.state));
+
+ String replicaName = null;
+ for (Replica r : c.getSlice("shard1").getReplicas()) {
+ if(r.getNodeName() .equals(jsr.getNodeName())) {
+ replicaName = r.getName();
+ }
+ }
+
+ if(replicaName != null) {
+ log.info("restarting the node : {}, state.json v: {} downreplica :{}", jsr.getNodeName(), c.getZNodeVersion(), replicaName);
+ jsr.stop();
+ c = cluster1.getSolrClient().getZkStateReader().getCollection(testCollection);
+ log.info("after down node, state.json v: {}", c.getZNodeVersion());
+ prs = PerReplicaStates.fetch(collectionPath, cluster.getZkClient(), null);
+ PerReplicaStates.State st = prs.get(replicaName);
+ assertNotEquals(Replica.State.ACTIVE, st.state);
+ jsr.start();
+ cluster1.waitForActiveCollection(testCollection, 1, 2);
+ prs = PerReplicaStates.fetch(collectionPath, cluster.getZkClient(), null);
+ prs.states.forEachEntry((s, state) -> assertEquals(Replica.State.ACTIVE, state.state));
+ }
+
+ } finally {
+ cluster1.shutdown();
+ log.info("SHUTDOWN CLUSTER");
+ }
+
+ }
}