You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by ho...@apache.org on 2021/11/16 21:06:01 UTC
[solr] branch main updated: SOLR-15795: Fix REPLACENODE to not use source node (#414)
This is an automated email from the ASF dual-hosted git repository.
houston pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr.git
The following commit(s) were added to refs/heads/main by this push:
new 70a19f9 SOLR-15795: Fix REPLACENODE to not use source node (#414)
70a19f9 is described below
commit 70a19f921b6b987c8fcca519edcbdc10cbe3598c
Author: Houston Putman <ho...@apache.org>
AuthorDate: Tue Nov 16 16:05:44 2021 -0500
SOLR-15795: Fix REPLACENODE to not use source node (#414)
---
solr/CHANGES.txt | 2 +
.../apache/solr/cloud/api/collections/Assign.java | 28 +++-
.../solr/cloud/api/collections/ReplaceNodeCmd.java | 151 +++++++++++----------
.../org/apache/solr/cloud/ReplaceNodeTest.java | 73 ++++++----
4 files changed, 149 insertions(+), 105 deletions(-)
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 7d07cfb..831904c 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -434,6 +434,8 @@ Bug Fixes
* SOLR-15635: Don't close hooks twice when SolrRequestInfo is cleared twice; or /export with classic join
closed fromCore if provided (Mikhail Khludnev, David Smiley)
+
+* SOLR-15795: Fix REPLACENODE to not use source node when choosing a target node for new replicas (Houston Putman)
================== 8.11.0 ==================
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java
index b399ed4..c6702ef 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/Assign.java
@@ -340,7 +340,7 @@ public class Assign {
return nodeNameVsShardCount;
}
- // throw an exception if any node int the supplied list is not live.
+ // throw an exception if any node in the supplied list is not live.
// Empty or null list always succeeds and returns the input.
private static List<String> checkLiveNodes(List<String> createNodeList, ClusterState clusterState) {
Set<String> liveNodes = clusterState.getLiveNodes();
@@ -348,7 +348,7 @@ public class Assign {
if (!liveNodes.containsAll(createNodeList)) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"At least one of the node(s) specified " + createNodeList + " are not currently active in "
- + createNodeList + ", no action taken.");
+ + liveNodes + ", no action taken.");
}
// the logic that was extracted to this method used to create a defensive copy but no code
// was modifying the copy, if this method is made protected or public we want to go back to that
@@ -356,6 +356,26 @@ public class Assign {
return createNodeList; // unmodified, but return for inline use
}
+ // throw an exception if all nodes in the supplied list are not live.
+ // Empty list will also fail.
+ // Returns the input
+ private static List<String> checkAnyLiveNodes(List<String> createNodeList, ClusterState clusterState) {
+ Set<String> liveNodes = clusterState.getLiveNodes();
+ if (createNodeList == null) {
+ createNodeList = Collections.emptyList();
+ }
+ boolean anyLiveNodes = false;
+ for (String node : createNodeList) {
+ anyLiveNodes |= liveNodes.contains(node);
+ }
+ if (!anyLiveNodes) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+ "None of the node(s) specified " + createNodeList + " are currently active in "
+ + liveNodes + ", no action taken.");
+ }
+ return createNodeList; // unmodified, but return for inline use. Only modified if empty, and that will throw an error
+ }
+
/**
* Thrown if there is an exception while assigning nodes for replicas
*/
@@ -501,8 +521,8 @@ public class Assign {
nodeList = sortedNodeList.stream().map(replicaCount -> replicaCount.nodeName).collect(Collectors.toList());
}
- // otherwise we get a div/0 below
- assert !nodeList.isEmpty();
+ // Throw an error if there aren't any live nodes.
+ checkAnyLiveNodes(nodeList, solrCloudManager.getClusterStateProvider().getClusterState());
int i = 0;
List<ReplicaPosition> result = new ArrayList<>();
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplaceNodeCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplaceNodeCmd.java
index 9ea6f6a..1d9307d 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplaceNodeCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/ReplaceNodeCmd.java
@@ -27,6 +27,8 @@ import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
import org.apache.solr.cloud.ActiveReplicaWatcher;
import org.apache.solr.common.SolrCloseableLatch;
import org.apache.solr.common.SolrException;
@@ -77,6 +79,8 @@ public class ReplaceNodeCmd implements CollApiCmds.CollectionApiCommand {
}
if (target != null && !clusterState.liveNodesContain(target)) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Target Node: " + target + " is not live");
+ } else if (clusterState.getLiveNodes().size() <= 1) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No nodes other than the source node: " + source + " are live, therefore replicas cannot be moved");
}
List<ZkNodeProps> sourceReplicas = getReplicasOfNode(source, clusterState);
// how many leaders are we moving? for these replicas we have to make sure that either:
@@ -97,89 +101,88 @@ public class ReplaceNodeCmd implements CollApiCmds.CollectionApiCommand {
SolrCloseableLatch countDownLatch = new SolrCloseableLatch(sourceReplicas.size(), ccc.getCloseableToLatchOn());
SolrCloseableLatch replicasToRecover = new SolrCloseableLatch(numLeaders, ccc.getCloseableToLatchOn());
- try {
- for (ZkNodeProps sourceReplica : sourceReplicas) {
- String sourceCollection = sourceReplica.getStr(COLLECTION_PROP);
- if (log.isInfoEnabled()) {
- log.info("Going to create replica for collection={} shard={} on node={}", sourceCollection, sourceReplica.getStr(SHARD_ID_PROP), target);
- }
- String targetNode = target;
- if (targetNode == null) {
- Replica.Type replicaType = Replica.Type.get(sourceReplica.getStr(ZkStateReader.REPLICA_TYPE));
- int numNrtReplicas = replicaType == Replica.Type.NRT ? 1 : 0;
- int numTlogReplicas = replicaType == Replica.Type.TLOG ? 1 : 0;
- int numPullReplicas = replicaType == Replica.Type.PULL ? 1 : 0;
- Assign.AssignRequest assignRequest = new Assign.AssignRequestBuilder()
- .forCollection(sourceCollection)
- .forShard(Collections.singletonList(sourceReplica.getStr(SHARD_ID_PROP)))
- .assignNrtReplicas(numNrtReplicas)
- .assignTlogReplicas(numTlogReplicas)
- .assignPullReplicas(numPullReplicas)
- .onNodes(new ArrayList<>(ccc.getSolrCloudManager().getClusterStateProvider().getLiveNodes()))
- .build();
- Assign.AssignStrategy assignStrategy = Assign.createAssignStrategy(
- ccc.getCoreContainer(),
- clusterState, clusterState.getCollection(sourceCollection));
- targetNode = assignStrategy.assign(ccc.getSolrCloudManager(), assignRequest).get(0).node;
- }
- ZkNodeProps msg = sourceReplica.plus("parallel", String.valueOf(parallel)).plus(CoreAdminParams.NODE, targetNode);
- if (async != null) msg.getProperties().put(ASYNC, async);
- NamedList<Object> nl = new NamedList<>();
- final ZkNodeProps addedReplica = new AddReplicaCmd(ccc).addReplica(clusterState,
- msg, nl, () -> {
- countDownLatch.countDown();
- if (nl.get("failure") != null) {
- String errorString = String.format(Locale.ROOT, "Failed to create replica for collection=%s shard=%s" +
- " on node=%s", sourceCollection, sourceReplica.getStr(SHARD_ID_PROP), target);
- log.warn(errorString);
- // one replica creation failed. Make the best attempt to
- // delete all the replicas created so far in the target
- // and exit
- synchronized (results) {
- results.add("failure", errorString);
- anyOneFailed.set(true);
- }
- } else {
- if (log.isDebugEnabled()) {
- log.debug("Successfully created replica for collection={} shard={} on node={}",
- sourceCollection, sourceReplica.getStr(SHARD_ID_PROP), target);
- }
+
+ for (ZkNodeProps sourceReplica : sourceReplicas) {
+ String sourceCollection = sourceReplica.getStr(COLLECTION_PROP);
+ if (log.isInfoEnabled()) {
+ log.info("Going to create replica for collection={} shard={} on node={}", sourceCollection, sourceReplica.getStr(SHARD_ID_PROP), target);
+ }
+ String targetNode = target;
+ if (targetNode == null) {
+ Replica.Type replicaType = Replica.Type.get(sourceReplica.getStr(ZkStateReader.REPLICA_TYPE));
+ int numNrtReplicas = replicaType == Replica.Type.NRT ? 1 : 0;
+ int numTlogReplicas = replicaType == Replica.Type.TLOG ? 1 : 0;
+ int numPullReplicas = replicaType == Replica.Type.PULL ? 1 : 0;
+ Assign.AssignRequest assignRequest = new Assign.AssignRequestBuilder()
+ .forCollection(sourceCollection)
+ .forShard(Collections.singletonList(sourceReplica.getStr(SHARD_ID_PROP)))
+ .assignNrtReplicas(numNrtReplicas)
+ .assignTlogReplicas(numTlogReplicas)
+ .assignPullReplicas(numPullReplicas)
+ .onNodes(ccc.getSolrCloudManager().getClusterStateProvider().getLiveNodes().stream().filter(node -> !node.equals(source)).collect(Collectors.toList()))
+ .build();
+ Assign.AssignStrategy assignStrategy = Assign.createAssignStrategy(
+ ccc.getCoreContainer(),
+ clusterState, clusterState.getCollection(sourceCollection));
+ targetNode = assignStrategy.assign(ccc.getSolrCloudManager(), assignRequest).get(0).node;
+ }
+ ZkNodeProps msg = sourceReplica.plus("parallel", String.valueOf(parallel)).plus(CoreAdminParams.NODE, targetNode);
+ if (async != null) msg.getProperties().put(ASYNC, async);
+ NamedList<Object> nl = new NamedList<>();
+ final ZkNodeProps addedReplica = new AddReplicaCmd(ccc).addReplica(clusterState,
+ msg, nl, () -> {
+ countDownLatch.countDown();
+ if (nl.get("failure") != null) {
+ String errorString = String.format(Locale.ROOT, "Failed to create replica for collection=%s shard=%s" +
+ " on node=%s", sourceCollection, sourceReplica.getStr(SHARD_ID_PROP), target);
+ log.warn(errorString);
+ // one replica creation failed. Make the best attempt to
+ // delete all the replicas created so far in the target
+ // and exit
+ synchronized (results) {
+ results.add("failure", errorString);
+ anyOneFailed.set(true);
}
- }).get(0);
-
- if (addedReplica != null) {
- createdReplicas.add(addedReplica);
- if (sourceReplica.getBool(ZkStateReader.LEADER_PROP, false) || waitForFinalState) {
- String shardName = sourceReplica.getStr(SHARD_ID_PROP);
- String replicaName = sourceReplica.getStr(ZkStateReader.REPLICA_PROP);
- String collectionName = sourceCollection;
- String key = collectionName + "_" + replicaName;
- CollectionStateWatcher watcher;
- if (waitForFinalState) {
- watcher = new ActiveReplicaWatcher(collectionName, null,
- Collections.singletonList(addedReplica.getStr(ZkStateReader.CORE_NAME_PROP)), replicasToRecover);
} else {
- watcher = new LeaderRecoveryWatcher(collectionName, shardName, replicaName,
- addedReplica.getStr(ZkStateReader.CORE_NAME_PROP), replicasToRecover);
+ if (log.isDebugEnabled()) {
+ log.debug("Successfully created replica for collection={} shard={} on node={}",
+ sourceCollection, sourceReplica.getStr(SHARD_ID_PROP), target);
+ }
}
- watchers.put(key, watcher);
- log.debug("--- adding {}, {}", key, watcher);
- zkStateReader.registerCollectionStateWatcher(collectionName, watcher);
+ }).get(0);
+
+ if (addedReplica != null) {
+ createdReplicas.add(addedReplica);
+ if (sourceReplica.getBool(ZkStateReader.LEADER_PROP, false) || waitForFinalState) {
+ String shardName = sourceReplica.getStr(SHARD_ID_PROP);
+ String replicaName = sourceReplica.getStr(ZkStateReader.REPLICA_PROP);
+ String collectionName = sourceCollection;
+ String key = collectionName + "_" + replicaName;
+ CollectionStateWatcher watcher;
+ if (waitForFinalState) {
+ watcher = new ActiveReplicaWatcher(collectionName, null,
+ Collections.singletonList(addedReplica.getStr(ZkStateReader.CORE_NAME_PROP)), replicasToRecover);
} else {
- log.debug("--- not waiting for {}", addedReplica);
+ watcher = new LeaderRecoveryWatcher(collectionName, shardName, replicaName,
+ addedReplica.getStr(ZkStateReader.CORE_NAME_PROP), replicasToRecover);
}
+ watchers.put(key, watcher);
+ log.debug("--- adding {}, {}", key, watcher);
+ zkStateReader.registerCollectionStateWatcher(collectionName, watcher);
+ } else {
+ log.debug("--- not waiting for {}", addedReplica);
}
}
+ }
- log.debug("Waiting for replicas to be added");
- if (!countDownLatch.await(timeout, TimeUnit.SECONDS)) {
- log.info("Timed out waiting for replicas to be added");
- anyOneFailed.set(true);
- } else {
- log.debug("Finished waiting for replicas to be added");
- }
- } finally {
+ log.debug("Waiting for replicas to be added");
+ if (!countDownLatch.await(timeout, TimeUnit.SECONDS)) {
+ log.info("Timed out waiting for replicas to be added");
+ anyOneFailed.set(true);
+ } else {
+ log.debug("Finished waiting for replicas to be added");
}
+
// now wait for leader replicas to recover
log.debug("Waiting for {} leader replicas to recover", numLeaders);
if (!replicasToRecover.await(timeout, TimeUnit.SECONDS)) {
diff --git a/solr/core/src/test/org/apache/solr/cloud/ReplaceNodeTest.java b/solr/core/src/test/org/apache/solr/cloud/ReplaceNodeTest.java
index 7c0d165..917603a 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ReplaceNodeTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ReplaceNodeTest.java
@@ -22,7 +22,6 @@ import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -35,6 +34,7 @@ import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.CoreAdminRequest;
import org.apache.solr.client.solrj.response.CoreAdminResponse;
import org.apache.solr.client.solrj.response.RequestStatusState;
+import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
@@ -44,6 +44,7 @@ import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.metrics.MetricsMap;
import org.apache.solr.metrics.SolrMetricManager;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
@@ -54,9 +55,12 @@ public class ReplaceNodeTest extends SolrCloudTestCase {
@BeforeClass
public static void setupCluster() throws Exception {
System.setProperty("metricsEnabled", "true");
- configureCluster(6)
- .addConfig("conf1", TEST_PATH().resolve("configsets").resolve("cloud-dynamic").resolve("conf"))
- .configure();
+ }
+
+ @Before
+ public void clearPreviousCluster() throws Exception {
+ // Clear the previous cluster before each test, since they use different numbers of nodes.
+ shutdownCluster();
}
protected String getSolrXml() {
@@ -65,6 +69,9 @@ public class ReplaceNodeTest extends SolrCloudTestCase {
@Test
public void test() throws Exception {
+ configureCluster(6)
+ .addConfig("conf1", TEST_PATH().resolve("configsets").resolve("cloud-dynamic").resolve("conf"))
+ .configure();
String coll = "replacenodetest_coll";
if (log.isInfoEnabled()) {
log.info("total_jettys: {}", cluster.getJettySolrRunners().size());
@@ -79,23 +86,23 @@ public class ReplaceNodeTest extends SolrCloudTestCase {
CollectionAdminRequest.Create create;
// NOTE: always using the createCollection that takes in 'int' for all types of replicas, so we never
// have to worry about null checking when comparing the Create command with the final Slices
-
+
// TODO: tlog replicas do not work correctly in tests due to fault TestInjection#waitForInSyncWithLeader
create = pickRandom(
- CollectionAdminRequest.createCollection(coll, "conf1", 5, 2,0,0),
- //CollectionAdminRequest.createCollection(coll, "conf1", 5, 1,1,0),
- //CollectionAdminRequest.createCollection(coll, "conf1", 5, 0,1,1),
- //CollectionAdminRequest.createCollection(coll, "conf1", 5, 1,0,1),
- //CollectionAdminRequest.createCollection(coll, "conf1", 5, 0,2,0),
- // check also replicationFactor 1
- CollectionAdminRequest.createCollection(coll, "conf1", 5, 1,0,0)
- //CollectionAdminRequest.createCollection(coll, "conf1", 5, 0,1,0)
+ CollectionAdminRequest.createCollection(coll, "conf1", 5, 2,0,0),
+ //CollectionAdminRequest.createCollection(coll, "conf1", 5, 1,1,0),
+ //CollectionAdminRequest.createCollection(coll, "conf1", 5, 0,1,1),
+ //CollectionAdminRequest.createCollection(coll, "conf1", 5, 1,0,1),
+ //CollectionAdminRequest.createCollection(coll, "conf1", 5, 0,2,0),
+ // check also replicationFactor 1
+ CollectionAdminRequest.createCollection(coll, "conf1", 5, 1,0,0)
+ //CollectionAdminRequest.createCollection(coll, "conf1", 5, 0,1,0)
);
create.setCreateNodeSet(StrUtils.join(l, ','));
cloudClient.request(create);
-
+
cluster.waitForActiveCollection(coll, 5, 5 * (create.getNumNrtReplicas() + create.getNumPullReplicas() + create.getNumTlogReplicas()));
-
+
DocCollection collection = cloudClient.getZkStateReader().getClusterState().getCollection(coll);
log.debug("### Before decommission: {}", collection);
log.info("excluded_node : {} ", emptyNode);
@@ -108,13 +115,13 @@ public class ReplaceNodeTest extends SolrCloudTestCase {
success = true;
break;
}
- assertFalse(rsp.getRequestStatus() == RequestStatusState.FAILED);
+ assertNotSame(rsp.getRequestStatus(), RequestStatusState.FAILED);
Thread.sleep(50);
}
assertTrue(success);
try (HttpSolrClient coreclient = getHttpSolrClient(cloudClient.getZkStateReader().getBaseUrlForNodeName(node2bdecommissioned))) {
CoreAdminResponse status = CoreAdminRequest.getStatus(null, coreclient);
- assertTrue(status.getCoreStatus().size() == 0);
+ assertEquals(0, status.getCoreStatus().size());
}
Thread.sleep(5000);
@@ -139,7 +146,7 @@ public class ReplaceNodeTest extends SolrCloudTestCase {
success = true;
break;
}
- assertFalse(rsp.getRequestStatus() == RequestStatusState.FAILED);
+ assertNotSame(rsp.getRequestStatus(), RequestStatusState.FAILED);
Thread.sleep(50);
}
assertTrue(success);
@@ -157,14 +164,7 @@ public class ReplaceNodeTest extends SolrCloudTestCase {
}
// make sure all newly created replicas on node are active
List<Replica> newReplicas = collection.getReplicas(node2bdecommissioned);
- replicas.forEach(r -> {
- for (Iterator<Replica> it = newReplicas.iterator(); it.hasNext(); ) {
- Replica nr = it.next();
- if (nr.getName().equals(r.getName())) {
- it.remove();
- }
- }
- });
+ replicas.forEach(r -> newReplicas.removeIf(nr -> nr.getName().equals(r.getName())));
assertFalse(newReplicas.isEmpty());
for (Replica r : newReplicas) {
assertEquals(r.toString(), Replica.State.ACTIVE, r.getState());
@@ -173,7 +173,7 @@ public class ReplaceNodeTest extends SolrCloudTestCase {
replicas = collection.getReplicas(emptyNode);
if (replicas != null) {
for (Replica r : replicas) {
- assertFalse(r.toString(), Replica.State.ACTIVE.equals(r.getState()));
+ assertNotEquals(r.toString(), Replica.State.ACTIVE, r.getState());
}
}
@@ -210,6 +210,25 @@ public class ReplaceNodeTest extends SolrCloudTestCase {
}
+ @Test
+ public void testFailOnSingleNode() throws Exception {
+ configureCluster(1)
+ .addConfig("conf1", TEST_PATH().resolve("configsets").resolve("cloud-dynamic").resolve("conf"))
+ .configure();
+ String coll = "replacesinglenodetest_coll";
+ if (log.isInfoEnabled()) {
+ log.info("total_jettys: {}", cluster.getJettySolrRunners().size());
+ }
+
+ CloudSolrClient cloudClient = cluster.getSolrClient();
+ cloudClient.request(CollectionAdminRequest.createCollection(coll, "conf1", 5, 1,0,0));
+
+ cluster.waitForActiveCollection(coll, 5, 5);
+
+ String liveNode = cloudClient.getZkStateReader().getClusterState().getLiveNodes().iterator().next();
+ expectThrows(SolrException.class, () -> createReplaceNodeRequest(liveNode, null, null).process(cloudClient));
+ }
+
public static CollectionAdminRequest.AsyncCollectionAdminRequest createReplaceNodeRequest(String sourceNode, String targetNode, Boolean parallel) {
if (random().nextBoolean()) {
return new CollectionAdminRequest.ReplaceNode(sourceNode, targetNode).setParallel(parallel);