You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by no...@apache.org on 2021/03/30 11:16:12 UTC

[lucene-solr] 01/01: SOLR-15288: Fix the node restart bug

This is an automated email from the ASF dual-hosted git repository.

noble pushed a commit to branch jira/solr15288
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit 41157ca739143466b8dca4f280579e4731f51938
Author: Noble Paul <no...@gmail.com>
AuthorDate: Tue Mar 30 22:15:18 2021 +1100

    SOLR-15288: Fix the node restart bug
---
 .../src/java/org/apache/solr/cloud/Overseer.java   |  2 +-
 .../apache/solr/cloud/overseer/NodeMutator.java    | 12 +++-
 .../org/apache/solr/cloud/NodeMutatorTest.java     |  2 +-
 .../test/org/apache/solr/cloud/OverseerTest.java   |  2 +-
 .../client/solrj/impl/CloudSolrClientTest.java     | 66 +++++++++++++++++++++-
 5 files changed, 78 insertions(+), 6 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index 4ca5703..e7b35b1 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -456,7 +456,7 @@ public class Overseer implements SolrCloseable {
             }
             break;
           case DOWNNODE:
-            return new NodeMutator().downNode(clusterState, message);
+            return new NodeMutator(getSolrCloudManager()).downNode(clusterState, message);
           default:
             throw new RuntimeException("unknown operation:" + operation + " contents:" + message.getProperties());
         }
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java
index e8db2b4..cf9eaf9 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java
@@ -24,19 +24,28 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 
+import org.apache.solr.client.solrj.cloud.SolrCloudManager;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.PerReplicaStates;
 import org.apache.solr.common.cloud.PerReplicaStatesOps;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.solr.cloud.overseer.SliceMutator.getZkClient;
+
 public class NodeMutator {
 
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  protected SolrZkClient zkClient;
+  public NodeMutator(SolrCloudManager cloudManager) {
+    zkClient = getZkClient(cloudManager);
+  }
 
   public List<ZkWriteCommand> downNode(ClusterState clusterState, ZkNodeProps message) {
     List<ZkWriteCommand> zkWriteCommands = new ArrayList<>();
@@ -81,8 +90,9 @@ public class NodeMutator {
 
       if (needToUpdateCollection) {
         if (docCollection.isPerReplicaState()) {
+          PerReplicaStates fetch = PerReplicaStates.fetch(docCollection.getZNode(), zkClient, docCollection.getPerReplicaStates());
           zkWriteCommands.add(new ZkWriteCommand(collection, docCollection.copyWithSlices(slicesCopy),
-              PerReplicaStatesOps.downReplicas(downedReplicas, docCollection.getPerReplicaStates()), false));
+              PerReplicaStatesOps.downReplicas(downedReplicas, fetch), false));
         } else {
           zkWriteCommands.add(new ZkWriteCommand(collection, docCollection.copyWithSlices(slicesCopy)));
         }
diff --git a/solr/core/src/test/org/apache/solr/cloud/NodeMutatorTest.java b/solr/core/src/test/org/apache/solr/cloud/NodeMutatorTest.java
index d49194d..b8279e3 100644
--- a/solr/core/src/test/org/apache/solr/cloud/NodeMutatorTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/NodeMutatorTest.java
@@ -43,7 +43,7 @@ public class NodeMutatorTest extends SolrTestCaseJ4Test {
 
   @Test
   public void downNodeReportsAllImpactedCollectionsAndNothingElse() throws IOException {
-    NodeMutator nm = new NodeMutator();
+    NodeMutator nm = new NodeMutator(null);
 
     //We use 2 nodes with maxShardsPerNode as 1
     //Collection1: 2 shards X 1 replica = replica1 on node1 and replica2 on node2
diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
index eb0e7a5..f4b2559 100644
--- a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
@@ -520,7 +520,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
         }
         ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.DOWNNODE.toLower(),
             ZkStateReader.NODE_NAME_PROP, "127.0.0.1:8983_solr");
-        List<ZkWriteCommand> commands = new NodeMutator().downNode(reader.getClusterState(), m);
+        List<ZkWriteCommand> commands = new NodeMutator(null).downNode(reader.getClusterState(), m);
 
         ZkDistributedQueue q = overseers.get(0).getStateUpdateQueue();
 
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java
index b85499d..d3141b7 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java
@@ -32,6 +32,8 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeoutException;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -53,6 +55,7 @@ import org.apache.solr.client.solrj.response.RequestStatusState;
 import org.apache.solr.client.solrj.response.SolrPingResponse;
 import org.apache.solr.client.solrj.response.UpdateResponse;
 import org.apache.solr.cloud.AbstractDistribZkTestBase;
+import org.apache.solr.cloud.MiniSolrCloudCluster;
 import org.apache.solr.cloud.SolrCloudTestCase;
 import org.apache.solr.common.SolrDocument;
 import org.apache.solr.common.SolrDocumentList;
@@ -91,7 +94,7 @@ import static org.apache.solr.client.solrj.SolrRequest.METHOD.POST;
  * This test would be faster if we simulated the zk state instead.
  */
 @Slow
-@LogLevel("org.apache.solr.cloud.Overseer=INFO;org.apache.solr.common.cloud=INFO;org.apache.solr.cloud.api.collections=INFO;org.apache.solr.cloud.overseer=INFO")
+@LogLevel("org.apache.solr.common.cloud.PerReplicaStatesOps=DEBUG;org.apache.solr.cloud.Overseer=INFO;org.apache.solr.common.cloud=INFO;org.apache.solr.cloud.api.collections=INFO;org.apache.solr.cloud.overseer=INFO")
 public class CloudSolrClientTest extends SolrCloudTestCase {
 
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -1099,7 +1102,7 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
     c.forEachReplica((s, replica) -> assertNotNull(replica.getReplicaState()));
     PerReplicaStates prs = PerReplicaStates.fetch(ZkStateReader.getCollectionPath(testCollection), cluster.getZkClient(), null);
     assertEquals(4, prs.states.size());
-
+    JettySolrRunner jsr = cluster.startJettySolrRunner();
     // Now let's do an add replica
     CollectionAdminRequest
         .addReplicaToShard(testCollection, "shard1")
@@ -1120,4 +1123,63 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
     assertEquals(4, prs.states.size());
   }
 
+  public void testPRSRestart() throws Exception {
+    String testCollection = "prs_restart_test";
+    MiniSolrCloudCluster cluster1 =
+        configureCluster(1)
+            .addConfig("conf1", getFile("solrj").toPath().resolve("solr").resolve("configsets").resolve("streaming").resolve("conf"))
+            .withJettyConfig(jetty -> jetty.enableV2(true))
+            .configure();
+    try {
+      CollectionAdminRequest.createCollection(testCollection, "conf1", 1, 1)
+          .setPerReplicaState(Boolean.TRUE)
+          .process(cluster.getSolrClient());
+      cluster1.waitForActiveCollection(testCollection, 1, 1);
+
+      DocCollection c = cluster1.getSolrClient().getZkStateReader().getCollection(testCollection);
+      c.forEachReplica((s, replica) -> assertNotNull(replica.getReplicaState()));
+      String collectionPath = ZkStateReader.getCollectionPath(testCollection);
+      PerReplicaStates prs = PerReplicaStates.fetch(collectionPath, cluster.getZkClient(), null);
+      assertEquals(1, prs.states.size());
+
+      JettySolrRunner jsr = cluster1.startJettySolrRunner();
+      assertEquals(2,cluster1.getJettySolrRunners().size());
+
+      // Now let's do an add replica
+      CollectionAdminRequest
+          .addReplicaToShard(testCollection, "shard1")
+          .process(cluster1.getSolrClient());
+      cluster1.waitForActiveCollection(testCollection, 1, 2);
+      prs = PerReplicaStates.fetch(collectionPath, cluster.getZkClient(), null);
+      assertEquals(2, prs.states.size());
+      c = cluster1.getSolrClient().getZkStateReader().getCollection(testCollection);
+      prs.states.forEachEntry((s, state) -> assertEquals(Replica.State.ACTIVE, state.state));
+
+      String replicaName = null;
+      for (Replica r : c.getSlice("shard1").getReplicas()) {
+        if(r.getNodeName() .equals(jsr.getNodeName())) {
+          replicaName = r.getName();
+        }
+      }
+
+      if(replicaName != null) {
+        log.info("restarting the node : {}, state.json v: {} downreplica :{}", jsr.getNodeName(), c.getZNodeVersion(), replicaName);
+        jsr.stop();
+        c = cluster1.getSolrClient().getZkStateReader().getCollection(testCollection);
+        log.info("after down node, state.json v: {}", c.getZNodeVersion());
+        prs =  PerReplicaStates.fetch(collectionPath, cluster.getZkClient(), null);
+        PerReplicaStates.State st = prs.get(replicaName);
+        assertNotEquals(Replica.State.ACTIVE, st.state);
+        jsr.start();
+        cluster1.waitForActiveCollection(testCollection, 1, 2);
+        prs =  PerReplicaStates.fetch(collectionPath, cluster.getZkClient(), null);
+        prs.states.forEachEntry((s, state) -> assertEquals(Replica.State.ACTIVE, state.state));
+      }
+
+    } finally {
+      cluster1.shutdown();
+    log.info("SHUTDOWN CLUSTER");
+    }
+
+  }
 }