You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by ho...@apache.org on 2021/11/16 21:06:01 UTC

[solr] branch main updated: SOLR-15795: Fix REPLACENODE to not use source node (#414)

This is an automated email from the ASF dual-hosted git repository.

houston pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr.git


The following commit(s) were added to refs/heads/main by this push:
     new 70a19f9  SOLR-15795: Fix REPLACENODE to not use source node (#414)
70a19f9 is described below

commit 70a19f921b6b987c8fcca519edcbdc10cbe3598c
Author: Houston Putman <ho...@apache.org>
AuthorDate: Tue Nov 16 16:05:44 2021 -0500

    SOLR-15795: Fix REPLACENODE to not use source node (#414)
---
 solr/CHANGES.txt                                   |   2 +
 .../apache/solr/cloud/api/collections/Assign.java  |  28 +++-
 .../solr/cloud/api/collections/ReplaceNodeCmd.java | 151 +++++++++++----------
 .../org/apache/solr/cloud/ReplaceNodeTest.java     |  73 ++++++----
 4 files changed, 149 insertions(+), 105 deletions(-)

diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 7d07cfb..831904c 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -434,6 +434,8 @@ Bug Fixes
 
 * SOLR-15635: Don't close hooks twice when SolrRequestInfo is cleared twice; or /export with classic join
   closed fromCore if provided (Mikhail Khludnev, David Smiley)
+  
+* SOLR-15795: Fix REPLACENODE to not use source node when choosing a target node for new replicas (Houston Putman)
 
 ==================  8.11.0 ==================
 
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java
index b399ed4..c6702ef 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java
@@ -340,7 +340,7 @@ public class Assign {
     return nodeNameVsShardCount;
   }
 
-  // throw an exception if any node int the supplied list is not live.
+  // throw an exception if any node in the supplied list is not live.
   // Empty or null list always succeeds and returns the input.
   private static List<String> checkLiveNodes(List<String> createNodeList, ClusterState clusterState) {
     Set<String> liveNodes = clusterState.getLiveNodes();
@@ -348,7 +348,7 @@ public class Assign {
       if (!liveNodes.containsAll(createNodeList)) {
         throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
             "At least one of the node(s) specified " + createNodeList + " are not currently active in "
-                + createNodeList + ", no action taken.");
+                + liveNodes + ", no action taken.");
       }
       // the logic that was extracted to this method used to create a defensive copy but no code
       // was modifying the copy, if this method is made protected or public we want to go back to that
@@ -356,6 +356,26 @@ public class Assign {
     return createNodeList; // unmodified, but return for inline use
   }
 
+  // throw an exception if all nodes in the supplied list are not live.
+  // Empty list will also fail.
+  // Returns the input
+  private static List<String> checkAnyLiveNodes(List<String> createNodeList, ClusterState clusterState) {
+    Set<String> liveNodes = clusterState.getLiveNodes();
+    if (createNodeList == null) {
+      createNodeList = Collections.emptyList();
+    }
+    boolean anyLiveNodes = false;
+    for (String node : createNodeList) {
+      anyLiveNodes |= liveNodes.contains(node);
+    }
+    if (!anyLiveNodes) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+          "None of the node(s) specified " + createNodeList + " are currently active in "
+              + liveNodes + ", no action taken.");
+    }
+    return createNodeList; // unmodified, but return for inline use. Only modified if empty, and that will throw an error
+  }
+
   /**
    * Thrown if there is an exception while assigning nodes for replicas
    */
@@ -501,8 +521,8 @@ public class Assign {
         nodeList = sortedNodeList.stream().map(replicaCount -> replicaCount.nodeName).collect(Collectors.toList());
       }
 
-      // otherwise we get a div/0 below
-      assert !nodeList.isEmpty();
+      // Throw an error if there aren't any live nodes.
+      checkAnyLiveNodes(nodeList, solrCloudManager.getClusterStateProvider().getClusterState());
 
       int i = 0;
       List<ReplicaPosition> result = new ArrayList<>();
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplaceNodeCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplaceNodeCmd.java
index 9ea6f6a..1d9307d 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplaceNodeCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplaceNodeCmd.java
@@ -27,6 +27,8 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
 import org.apache.solr.cloud.ActiveReplicaWatcher;
 import org.apache.solr.common.SolrCloseableLatch;
 import org.apache.solr.common.SolrException;
@@ -77,6 +79,8 @@ public class ReplaceNodeCmd implements CollApiCmds.CollectionApiCommand {
     }
     if (target != null && !clusterState.liveNodesContain(target)) {
       throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Target Node: " + target + " is not live");
+    } else if (clusterState.getLiveNodes().size() <= 1) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No nodes other than the source node: " + source + " are live, therefore replicas cannot be moved");
     }
     List<ZkNodeProps> sourceReplicas = getReplicasOfNode(source, clusterState);
     // how many leaders are we moving? for these replicas we have to make sure that either:
@@ -97,89 +101,88 @@ public class ReplaceNodeCmd implements CollApiCmds.CollectionApiCommand {
     SolrCloseableLatch countDownLatch = new SolrCloseableLatch(sourceReplicas.size(), ccc.getCloseableToLatchOn());
 
     SolrCloseableLatch replicasToRecover = new SolrCloseableLatch(numLeaders, ccc.getCloseableToLatchOn());
-    try {
-      for (ZkNodeProps sourceReplica : sourceReplicas) {
-        String sourceCollection = sourceReplica.getStr(COLLECTION_PROP);
-        if (log.isInfoEnabled()) {
-          log.info("Going to create replica for collection={} shard={} on node={}", sourceCollection, sourceReplica.getStr(SHARD_ID_PROP), target);
-        }
-        String targetNode = target;
-        if (targetNode == null) {
-          Replica.Type replicaType = Replica.Type.get(sourceReplica.getStr(ZkStateReader.REPLICA_TYPE));
-          int numNrtReplicas = replicaType == Replica.Type.NRT ? 1 : 0;
-          int numTlogReplicas = replicaType == Replica.Type.TLOG ? 1 : 0;
-          int numPullReplicas = replicaType == Replica.Type.PULL ? 1 : 0;
-          Assign.AssignRequest assignRequest = new Assign.AssignRequestBuilder()
-              .forCollection(sourceCollection)
-              .forShard(Collections.singletonList(sourceReplica.getStr(SHARD_ID_PROP)))
-              .assignNrtReplicas(numNrtReplicas)
-              .assignTlogReplicas(numTlogReplicas)
-              .assignPullReplicas(numPullReplicas)
-              .onNodes(new ArrayList<>(ccc.getSolrCloudManager().getClusterStateProvider().getLiveNodes()))
-              .build();
-          Assign.AssignStrategy assignStrategy = Assign.createAssignStrategy(
-              ccc.getCoreContainer(),
-              clusterState, clusterState.getCollection(sourceCollection));
-          targetNode = assignStrategy.assign(ccc.getSolrCloudManager(), assignRequest).get(0).node;
-        }
-        ZkNodeProps msg = sourceReplica.plus("parallel", String.valueOf(parallel)).plus(CoreAdminParams.NODE, targetNode);
-        if (async != null) msg.getProperties().put(ASYNC, async);
-        NamedList<Object> nl = new NamedList<>();
-        final ZkNodeProps addedReplica = new AddReplicaCmd(ccc).addReplica(clusterState,
-            msg, nl, () -> {
-              countDownLatch.countDown();
-              if (nl.get("failure") != null) {
-                String errorString = String.format(Locale.ROOT, "Failed to create replica for collection=%s shard=%s" +
-                    " on node=%s", sourceCollection, sourceReplica.getStr(SHARD_ID_PROP), target);
-                log.warn(errorString);
-                // one replica creation failed. Make the best attempt to
-                // delete all the replicas created so far in the target
-                // and exit
-                synchronized (results) {
-                  results.add("failure", errorString);
-                  anyOneFailed.set(true);
-                }
-              } else {
-                if (log.isDebugEnabled()) {
-                  log.debug("Successfully created replica for collection={} shard={} on node={}",
-                      sourceCollection, sourceReplica.getStr(SHARD_ID_PROP), target);
-                }
+
+    for (ZkNodeProps sourceReplica : sourceReplicas) {
+      String sourceCollection = sourceReplica.getStr(COLLECTION_PROP);
+      if (log.isInfoEnabled()) {
+        log.info("Going to create replica for collection={} shard={} on node={}", sourceCollection, sourceReplica.getStr(SHARD_ID_PROP), target);
+      }
+      String targetNode = target;
+      if (targetNode == null) {
+        Replica.Type replicaType = Replica.Type.get(sourceReplica.getStr(ZkStateReader.REPLICA_TYPE));
+        int numNrtReplicas = replicaType == Replica.Type.NRT ? 1 : 0;
+        int numTlogReplicas = replicaType == Replica.Type.TLOG ? 1 : 0;
+        int numPullReplicas = replicaType == Replica.Type.PULL ? 1 : 0;
+        Assign.AssignRequest assignRequest = new Assign.AssignRequestBuilder()
+            .forCollection(sourceCollection)
+            .forShard(Collections.singletonList(sourceReplica.getStr(SHARD_ID_PROP)))
+            .assignNrtReplicas(numNrtReplicas)
+            .assignTlogReplicas(numTlogReplicas)
+            .assignPullReplicas(numPullReplicas)
+            .onNodes(ccc.getSolrCloudManager().getClusterStateProvider().getLiveNodes().stream().filter(node -> !node.equals(source)).collect(Collectors.toList()))
+            .build();
+        Assign.AssignStrategy assignStrategy = Assign.createAssignStrategy(
+            ccc.getCoreContainer(),
+            clusterState, clusterState.getCollection(sourceCollection));
+        targetNode = assignStrategy.assign(ccc.getSolrCloudManager(), assignRequest).get(0).node;
+      }
+      ZkNodeProps msg = sourceReplica.plus("parallel", String.valueOf(parallel)).plus(CoreAdminParams.NODE, targetNode);
+      if (async != null) msg.getProperties().put(ASYNC, async);
+      NamedList<Object> nl = new NamedList<>();
+      final ZkNodeProps addedReplica = new AddReplicaCmd(ccc).addReplica(clusterState,
+          msg, nl, () -> {
+            countDownLatch.countDown();
+            if (nl.get("failure") != null) {
+              String errorString = String.format(Locale.ROOT, "Failed to create replica for collection=%s shard=%s" +
+                  " on node=%s", sourceCollection, sourceReplica.getStr(SHARD_ID_PROP), target);
+              log.warn(errorString);
+              // one replica creation failed. Make the best attempt to
+              // delete all the replicas created so far in the target
+              // and exit
+              synchronized (results) {
+                results.add("failure", errorString);
+                anyOneFailed.set(true);
               }
-            }).get(0);
-
-        if (addedReplica != null) {
-          createdReplicas.add(addedReplica);
-          if (sourceReplica.getBool(ZkStateReader.LEADER_PROP, false) || waitForFinalState) {
-            String shardName = sourceReplica.getStr(SHARD_ID_PROP);
-            String replicaName = sourceReplica.getStr(ZkStateReader.REPLICA_PROP);
-            String collectionName = sourceCollection;
-            String key = collectionName + "_" + replicaName;
-            CollectionStateWatcher watcher;
-            if (waitForFinalState) {
-              watcher = new ActiveReplicaWatcher(collectionName, null,
-                  Collections.singletonList(addedReplica.getStr(ZkStateReader.CORE_NAME_PROP)), replicasToRecover);
             } else {
-              watcher = new LeaderRecoveryWatcher(collectionName, shardName, replicaName,
-                  addedReplica.getStr(ZkStateReader.CORE_NAME_PROP), replicasToRecover);
+              if (log.isDebugEnabled()) {
+                log.debug("Successfully created replica for collection={} shard={} on node={}",
+                    sourceCollection, sourceReplica.getStr(SHARD_ID_PROP), target);
+              }
             }
-            watchers.put(key, watcher);
-            log.debug("--- adding {}, {}", key, watcher);
-            zkStateReader.registerCollectionStateWatcher(collectionName, watcher);
+          }).get(0);
+
+      if (addedReplica != null) {
+        createdReplicas.add(addedReplica);
+        if (sourceReplica.getBool(ZkStateReader.LEADER_PROP, false) || waitForFinalState) {
+          String shardName = sourceReplica.getStr(SHARD_ID_PROP);
+          String replicaName = sourceReplica.getStr(ZkStateReader.REPLICA_PROP);
+          String collectionName = sourceCollection;
+          String key = collectionName + "_" + replicaName;
+          CollectionStateWatcher watcher;
+          if (waitForFinalState) {
+            watcher = new ActiveReplicaWatcher(collectionName, null,
+                Collections.singletonList(addedReplica.getStr(ZkStateReader.CORE_NAME_PROP)), replicasToRecover);
           } else {
-            log.debug("--- not waiting for {}", addedReplica);
+            watcher = new LeaderRecoveryWatcher(collectionName, shardName, replicaName,
+                addedReplica.getStr(ZkStateReader.CORE_NAME_PROP), replicasToRecover);
           }
+          watchers.put(key, watcher);
+          log.debug("--- adding {}, {}", key, watcher);
+          zkStateReader.registerCollectionStateWatcher(collectionName, watcher);
+        } else {
+          log.debug("--- not waiting for {}", addedReplica);
         }
       }
+    }
 
-      log.debug("Waiting for replicas to be added");
-      if (!countDownLatch.await(timeout, TimeUnit.SECONDS)) {
-        log.info("Timed out waiting for replicas to be added");
-        anyOneFailed.set(true);
-      } else {
-        log.debug("Finished waiting for replicas to be added");
-      }
-    } finally {
+    log.debug("Waiting for replicas to be added");
+    if (!countDownLatch.await(timeout, TimeUnit.SECONDS)) {
+      log.info("Timed out waiting for replicas to be added");
+      anyOneFailed.set(true);
+    } else {
+      log.debug("Finished waiting for replicas to be added");
     }
+
     // now wait for leader replicas to recover
     log.debug("Waiting for {} leader replicas to recover", numLeaders);
     if (!replicasToRecover.await(timeout, TimeUnit.SECONDS)) {
diff --git a/solr/core/src/test/org/apache/solr/cloud/ReplaceNodeTest.java b/solr/core/src/test/org/apache/solr/cloud/ReplaceNodeTest.java
index 7c0d165..917603a 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ReplaceNodeTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ReplaceNodeTest.java
@@ -22,7 +22,6 @@ import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.EnumSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -35,6 +34,7 @@ import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.client.solrj.request.CoreAdminRequest;
 import org.apache.solr.client.solrj.response.CoreAdminResponse;
 import org.apache.solr.client.solrj.response.RequestStatusState;
+import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
@@ -44,6 +44,7 @@ import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.common.util.StrUtils;
 import org.apache.solr.metrics.MetricsMap;
 import org.apache.solr.metrics.SolrMetricManager;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -54,9 +55,12 @@ public class ReplaceNodeTest extends SolrCloudTestCase {
   @BeforeClass
   public static void setupCluster() throws Exception {
     System.setProperty("metricsEnabled", "true");
-    configureCluster(6)
-        .addConfig("conf1", TEST_PATH().resolve("configsets").resolve("cloud-dynamic").resolve("conf"))
-        .configure();
+  }
+
+  @Before
+  public void clearPreviousCluster() throws Exception {
+    // Clear the previous cluster before each test, since they use different numbers of nodes.
+    shutdownCluster();
   }
 
   protected String getSolrXml() {
@@ -65,6 +69,9 @@ public class ReplaceNodeTest extends SolrCloudTestCase {
 
   @Test
   public void test() throws Exception {
+    configureCluster(6)
+        .addConfig("conf1", TEST_PATH().resolve("configsets").resolve("cloud-dynamic").resolve("conf"))
+        .configure();
     String coll = "replacenodetest_coll";
     if (log.isInfoEnabled()) {
       log.info("total_jettys: {}", cluster.getJettySolrRunners().size());
@@ -79,23 +86,23 @@ public class ReplaceNodeTest extends SolrCloudTestCase {
     CollectionAdminRequest.Create create;
     // NOTE: always using the createCollection that takes in 'int' for all types of replicas, so we never
     // have to worry about null checking when comparing the Create command with the final Slices
-    
+
     // TODO: tlog replicas do not work correctly in tests due to fault TestInjection#waitForInSyncWithLeader
     create = pickRandom(
-                        CollectionAdminRequest.createCollection(coll, "conf1", 5, 2,0,0),
-                        //CollectionAdminRequest.createCollection(coll, "conf1", 5, 1,1,0),
-                        //CollectionAdminRequest.createCollection(coll, "conf1", 5, 0,1,1),
-                        //CollectionAdminRequest.createCollection(coll, "conf1", 5, 1,0,1),
-                        //CollectionAdminRequest.createCollection(coll, "conf1", 5, 0,2,0),
-                        // check also replicationFactor 1
-                        CollectionAdminRequest.createCollection(coll, "conf1", 5, 1,0,0)
-                        //CollectionAdminRequest.createCollection(coll, "conf1", 5, 0,1,0)
+        CollectionAdminRequest.createCollection(coll, "conf1", 5, 2,0,0),
+        //CollectionAdminRequest.createCollection(coll, "conf1", 5, 1,1,0),
+        //CollectionAdminRequest.createCollection(coll, "conf1", 5, 0,1,1),
+        //CollectionAdminRequest.createCollection(coll, "conf1", 5, 1,0,1),
+        //CollectionAdminRequest.createCollection(coll, "conf1", 5, 0,2,0),
+        // check also replicationFactor 1
+        CollectionAdminRequest.createCollection(coll, "conf1", 5, 1,0,0)
+        //CollectionAdminRequest.createCollection(coll, "conf1", 5, 0,1,0)
     );
     create.setCreateNodeSet(StrUtils.join(l, ','));
     cloudClient.request(create);
-    
+
     cluster.waitForActiveCollection(coll, 5, 5 * (create.getNumNrtReplicas() + create.getNumPullReplicas() + create.getNumTlogReplicas()));
-    
+
     DocCollection collection = cloudClient.getZkStateReader().getClusterState().getCollection(coll);
     log.debug("### Before decommission: {}", collection);
     log.info("excluded_node : {}  ", emptyNode);
@@ -108,13 +115,13 @@ public class ReplaceNodeTest extends SolrCloudTestCase {
         success = true;
         break;
       }
-      assertFalse(rsp.getRequestStatus() == RequestStatusState.FAILED);
+      assertNotSame(rsp.getRequestStatus(), RequestStatusState.FAILED);
       Thread.sleep(50);
     }
     assertTrue(success);
     try (HttpSolrClient coreclient = getHttpSolrClient(cloudClient.getZkStateReader().getBaseUrlForNodeName(node2bdecommissioned))) {
       CoreAdminResponse status = CoreAdminRequest.getStatus(null, coreclient);
-      assertTrue(status.getCoreStatus().size() == 0);
+      assertEquals(0, status.getCoreStatus().size());
     }
 
     Thread.sleep(5000);
@@ -139,7 +146,7 @@ public class ReplaceNodeTest extends SolrCloudTestCase {
         success = true;
         break;
       }
-      assertFalse(rsp.getRequestStatus() == RequestStatusState.FAILED);
+      assertNotSame(rsp.getRequestStatus(), RequestStatusState.FAILED);
       Thread.sleep(50);
     }
     assertTrue(success);
@@ -157,14 +164,7 @@ public class ReplaceNodeTest extends SolrCloudTestCase {
     }
     // make sure all newly created replicas on node are active
     List<Replica> newReplicas = collection.getReplicas(node2bdecommissioned);
-    replicas.forEach(r -> {
-      for (Iterator<Replica> it = newReplicas.iterator(); it.hasNext(); ) {
-        Replica nr = it.next();
-        if (nr.getName().equals(r.getName())) {
-          it.remove();
-        }
-      }
-    });
+    replicas.forEach(r -> newReplicas.removeIf(nr -> nr.getName().equals(r.getName())));
     assertFalse(newReplicas.isEmpty());
     for (Replica r : newReplicas) {
       assertEquals(r.toString(), Replica.State.ACTIVE, r.getState());
@@ -173,7 +173,7 @@ public class ReplaceNodeTest extends SolrCloudTestCase {
     replicas = collection.getReplicas(emptyNode);
     if (replicas != null) {
       for (Replica r : replicas) {
-        assertFalse(r.toString(), Replica.State.ACTIVE.equals(r.getState()));
+        assertNotEquals(r.toString(), Replica.State.ACTIVE, r.getState());
       }
     }
 
@@ -210,6 +210,25 @@ public class ReplaceNodeTest extends SolrCloudTestCase {
 
   }
 
+  @Test
+  public void testFailOnSingleNode() throws Exception {
+    configureCluster(1)
+        .addConfig("conf1", TEST_PATH().resolve("configsets").resolve("cloud-dynamic").resolve("conf"))
+        .configure();
+    String coll = "replacesinglenodetest_coll";
+    if (log.isInfoEnabled()) {
+      log.info("total_jettys: {}", cluster.getJettySolrRunners().size());
+    }
+
+    CloudSolrClient cloudClient = cluster.getSolrClient();
+    cloudClient.request(CollectionAdminRequest.createCollection(coll, "conf1", 5, 1,0,0));
+
+    cluster.waitForActiveCollection(coll, 5, 5);
+
+    String liveNode = cloudClient.getZkStateReader().getClusterState().getLiveNodes().iterator().next();
+    expectThrows(SolrException.class, () -> createReplaceNodeRequest(liveNode, null, null).process(cloudClient));
+  }
+
   public static  CollectionAdminRequest.AsyncCollectionAdminRequest createReplaceNodeRequest(String sourceNode, String targetNode, Boolean parallel) {
     if (random().nextBoolean()) {
       return new CollectionAdminRequest.ReplaceNode(sourceNode, targetNode).setParallel(parallel);