You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by ma...@apache.org on 2023/09/23 00:46:58 UTC
[solr] branch branch_9x updated: SOLR-16701: Race condition on PRS enabled collection deletion (#1460)
This is an automated email from the ASF dual-hosted git repository.
magibney pushed a commit to branch branch_9x
in repository https://gitbox.apache.org/repos/asf/solr.git
The following commit(s) were added to refs/heads/branch_9x by this push:
new f1ba2c2cb80 SOLR-16701: Race condition on PRS enabled collection deletion (#1460)
f1ba2c2cb80 is described below
commit f1ba2c2cb8050ad958326141d86e573a28ab0516
Author: patsonluk <pa...@users.noreply.github.com>
AuthorDate: Fri Sep 22 17:21:46 2023 -0700
SOLR-16701: Race condition on PRS enabled collection deletion (#1460)
(cherry picked from commit f22a51cc64f83f7b1268d9f3a4c50e36249bdd87)
---
solr/CHANGES.txt | 2 +
.../solr/cloud/overseer/ZkStateReaderTest.java | 120 +++++++++++++++++++++
.../solr/common/cloud/PerReplicaStatesOps.java | 14 +++
.../apache/solr/common/cloud/ZkStateReader.java | 15 +++
.../solr/common/util/CommonTestInjection.java | 96 +++++++++++++++++
5 files changed, 247 insertions(+)
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 9978cea69ff..ba6d4cf9855 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -147,6 +147,8 @@ Bug Fixes
* SOLR-16925: Fix indentation for JacksonJsonWriter (Houston Putman)
+* SOLR-16701: Fix race condition on PRS enabled collection deletion (Patson Luk)
+
Dependency Upgrades
---------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java
index 6e1f4221943..3e1ca33963c 100644
--- a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java
@@ -43,16 +43,19 @@ import org.apache.solr.common.cloud.DocRouter;
import org.apache.solr.common.cloud.PerReplicaStates;
import org.apache.solr.common.cloud.PerReplicaStatesOps;
import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.CommonTestInjection;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.SolrNamedThreadFactory;
import org.apache.solr.common.util.TimeSource;
+import org.apache.solr.common.util.Utils;
import org.apache.solr.common.util.ZLibCompressor;
import org.apache.solr.handler.admin.ConfigSetsHandler;
import org.apache.solr.util.LogLevel;
import org.apache.solr.util.TimeOut;
+import org.apache.zookeeper.KeeperException;
import org.junit.After;
import org.junit.Before;
import org.slf4j.Logger;
@@ -664,4 +667,121 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
ExecutorUtil.awaitTermination(executorService);
}
}
+
+ /**
+ * Ensure that collection state fetching (getCollectionLive etc.) would not throw exception when
+ * the state.json is deleted in between the state.json read and PRS entries read
+ */
+ public void testDeletePrsCollection() throws Exception {
+ ZkStateWriter writer = fixture.writer;
+ ZkStateReader reader = fixture.reader;
+
+ String collectionName = "c1";
+ fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/" + collectionName, true);
+
+ ClusterState clusterState = reader.getClusterState();
+
+ String nodeName = "node1:10000_solr";
+ String sliceName = "shard1";
+ Slice slice = new Slice(sliceName, Map.of(), Map.of(), collectionName);
+
+ // create new collection
+ DocCollection state =
+ DocCollection.create(
+ collectionName,
+ Map.of(sliceName, slice),
+ Collections.singletonMap(DocCollection.CollectionStateProps.PER_REPLICA_STATE, true),
+ DocRouter.DEFAULT,
+ 0,
+ PerReplicaStatesOps.getZkClientPrsSupplier(
+ fixture.zkClient, DocCollection.getCollectionPath(collectionName)));
+ ZkWriteCommand wc = new ZkWriteCommand(collectionName, state);
+ writer.enqueueUpdate(clusterState, Collections.singletonList(wc), null);
+ clusterState = writer.writePendingUpdates();
+
+ TimeOut timeOut = new TimeOut(5000, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME);
+ timeOut.waitFor(
+ "Timeout on waiting for c1 to show up in cluster state",
+ () -> reader.getClusterState().getCollectionOrNull(collectionName) != null);
+
+ String collectionPath = ZkStateReader.getCollectionPath(collectionName);
+
+ // now create the replica, take note that this has to be done after DocCollection creation with
+ // empty slice, otherwise the DocCollection ctor would fetch the PRS entries and throw
+ // exceptions
+ String replicaBaseUrl = Utils.getBaseUrlForNodeName(nodeName, "http");
+
+ String replicaName = "replica1";
+ Replica replica =
+ new Replica(
+ replicaName,
+ Map.of(
+ ZkStateReader.CORE_NAME_PROP,
+ "core1",
+ ZkStateReader.STATE_PROP,
+ Replica.State.ACTIVE.toString(),
+ ZkStateReader.NODE_NAME_PROP,
+ nodeName,
+ ZkStateReader.BASE_URL_PROP,
+ replicaBaseUrl,
+ ZkStateReader.REPLICA_TYPE,
+ Replica.Type.NRT.name()),
+ collectionName,
+ sliceName);
+
+ wc =
+ new ZkWriteCommand(
+ collectionName, SliceMutator.updateReplica(state, slice, replica.getName(), replica));
+ writer.enqueueUpdate(clusterState, Collections.singletonList(wc), null);
+ clusterState = writer.writePendingUpdates();
+
+ timeOut.waitFor(
+ "Timeout on waiting for replica to show up in cluster state",
+ () ->
+ reader.getCollectionLive(collectionName).getSlice(sliceName).getReplica(replicaName)
+ != null);
+
+ try (CommonTestInjection.BreakpointSetter breakpointSetter =
+ new CommonTestInjection.BreakpointSetter()) {
+ // set breakpoint such that after state.json fetch and before PRS entry fetch, we can delete
+ // the state.json and PRS entries to trigger the race condition
+ breakpointSetter.setImplementation(
+ PerReplicaStatesOps.class.getName() + "/beforePrsFetch",
+ (args) -> {
+ try {
+ // this is invoked after ZkStateReader.fetchCollectionState has fetched the state.json
+ // but before PRS entries.
+ // call delete state.json on ZK directly, very tricky to control execution order with
+ // writer.enqueueUpdate
+ reader.getZkClient().clean(collectionPath);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ } catch (KeeperException e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ // set breakpoint to verify the expected PrsZkNodeNotFoundException is indeed thrown within
+ // the execution flow, such exception is caught within the logic and not thrown to the
+ // caller
+ AtomicBoolean prsZkNodeNotFoundExceptionThrown = new AtomicBoolean(false);
+ breakpointSetter.setImplementation(
+ ZkStateReader.class.getName() + "/exercised",
+ (args) -> {
+ if (args[0] instanceof PerReplicaStatesOps.PrsZkNodeNotFoundException) {
+ prsZkNodeNotFoundExceptionThrown.set(true);
+ }
+ });
+
+ timeOut.waitFor(
+ "Timeout waiting for collection state to become null",
+ () -> {
+ // this should not throw exception even if the PRS entry read is delayed artificially
+ // (by previous command) and deleted after the following getCollectionLive call
+ return reader.getCollectionLive(collectionName) == null;
+ });
+
+ assertTrue(prsZkNodeNotFoundExceptionThrown.get());
+ }
+ }
}
diff --git a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/PerReplicaStatesOps.java b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/PerReplicaStatesOps.java
index 475b852e75c..dd0b14fd3e1 100644
--- a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/PerReplicaStatesOps.java
+++ b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/PerReplicaStatesOps.java
@@ -27,6 +27,7 @@ import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import org.apache.solr.common.SolrException;
+import org.apache.solr.common.util.CommonTestInjection;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Op;
@@ -56,6 +57,8 @@ public class PerReplicaStatesOps {
public static PerReplicaStates fetch(
String path, SolrZkClient zkClient, PerReplicaStates current) {
try {
+ assert CommonTestInjection.injectBreakpoint(
+ PerReplicaStatesOps.class.getName() + "/beforePrsFetch");
if (current != null) {
Stat stat = zkClient.exists(current.path, null, true);
if (stat == null) return new PerReplicaStates(path, 0, Collections.emptyList());
@@ -64,6 +67,11 @@ public class PerReplicaStatesOps {
Stat stat = new Stat();
List<String> children = zkClient.getChildren(path, null, stat, true);
return new PerReplicaStates(path, stat.getCversion(), Collections.unmodifiableList(children));
+ } catch (KeeperException.NoNodeException e) {
+ throw new PrsZkNodeNotFoundException(
+ SolrException.ErrorCode.SERVER_ERROR,
+ "Error fetching per-replica states. The node [" + path + "] is not found",
+ e);
} catch (KeeperException e) {
throw new SolrException(
SolrException.ErrorCode.SERVER_ERROR, "Error fetching per-replica states", e);
@@ -76,6 +84,12 @@ public class PerReplicaStatesOps {
}
}
+ public static class PrsZkNodeNotFoundException extends SolrException {
+ private PrsZkNodeNotFoundException(ErrorCode code, String msg, Throwable cause) {
+ super(code, msg, cause);
+ }
+ }
+
public static DocCollection.PrsSupplier getZkClientPrsSupplier(
SolrZkClient zkClient, String collectionPath) {
return () -> fetch(collectionPath, zkClient, null);
diff --git a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index d8115fdf1b8..17f7bdbc5d5 100644
--- a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -1627,6 +1627,21 @@ public class ZkStateReader implements SolrCloseable {
}
}
return null;
+ } catch (PerReplicaStatesOps.PrsZkNodeNotFoundException e) {
+ assert CommonTestInjection.injectBreakpoint(
+ ZkStateReader.class.getName() + "/exercised", e);
+ // could be a race condition that state.json and PRS entries are deleted between the
+ // state.json fetch and PRS entry fetch
+ Stat exists = zkClient.exists(collectionPath, watcher, true);
+ if (exists == null) {
+ log.info(
+ "PRS entry for collection {} not found in ZK. It was probably deleted between state.json read and PRS entry read.",
+ coll);
+
+ return null;
+ } else {
+ throw e; // unexpected, PRS node not found but the collection state.json still exists
+ }
}
}
}
diff --git a/solr/solrj/src/java/org/apache/solr/common/util/CommonTestInjection.java b/solr/solrj/src/java/org/apache/solr/common/util/CommonTestInjection.java
index e1d7caa8dd9..bcacd715a71 100644
--- a/solr/solrj/src/java/org/apache/solr/common/util/CommonTestInjection.java
+++ b/solr/solrj/src/java/org/apache/solr/common/util/CommonTestInjection.java
@@ -17,8 +17,14 @@
package org.apache.solr.common.util;
+import java.io.Closeable;
+import java.io.IOException;
import java.lang.invoke.MethodHandles;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,6 +38,7 @@ public class CommonTestInjection {
private static volatile Map<String, String> additionalSystemProps = null;
private static volatile Integer delay = null;
+ private static final ConcurrentMap<String, Breakpoint> breakpoints = new ConcurrentHashMap<>();
public static void reset() {
additionalSystemProps = null;
@@ -73,4 +80,93 @@ public class CommonTestInjection {
}
return true;
}
+
+ /**
+ * Injects a breakpoint that pauses the existing code execution, executes the code defined in the
+ * breakpoint implementation and then resumes afterward. The breakpoint implementation is looked
+ * up by the corresponding key used in {@link BreakpointSetter#setImplementation(String,
+ * Breakpoint)}
+ *
+ * <p>An example usages :
+ *
+ * <ol>
+ * <li>Inject a precise wait until a race condition is fulfilled before proceeding with original
+ * code execution
+ * <li>Inject a flag to catch exception statement which handles the exception without
+ * re-throwing. This could verify caught exception does get triggered
+ * </ol>
+ *
+ * <p>This should always be a part of an assert statement (ie assert injectBreakpoint(key)) such
+ * that it will be skipped for normal code execution
+ *
+ * @see BreakpointSetter#setImplementation(String, Breakpoint)
+ * @param key could simply be the fully qualified class name or more granular like class name +
+ * other id (such as method name). This should only be set by corresponding unit test cases
+ * with CommonTestInjection#setBreakpoint
+ * @param args optional arguments list to be passed to the Breakpoint
+ */
+ public static boolean injectBreakpoint(String key, Object... args) {
+ Breakpoint breakpoint = breakpoints.get(key);
+ if (breakpoint != null) {
+ log.info("Breakpoint with key {} is triggered", key);
+ breakpoint.executeAndResume(args);
+ log.info("Breakpoint with key {} was executed and normal code execution resumes", key);
+ } else {
+ log.debug(
+ "Breakpoint with key {} is triggered but there's no implementation set. Skipping...",
+ key);
+ }
+ return true;
+ }
+
+ public interface Breakpoint {
+ /**
+ * Code execution should break at where the breakpoint was injected, then it would execute this
+ * method and resumes the execution afterward.
+ */
+ void executeAndResume(Object... args);
+ }
+
+ /**
+ * Breakpoints should be set via this {@link BreakpointSetter} within the test case and close
+ * should be invoked as cleanup. Since this is closeable, it should usually be used in the
+ * try-with-resource syntax, such as:
+ *
+ * <pre>{@code
+ * try (BreakpointSetter breakpointSetter = new BreakpointSetter() {
+ * //... test code here that calls breakpointSetter.setImplementation(...)
+ * }
+ * }</pre>
+ */
+ public static class BreakpointSetter implements Closeable {
+ private Set<String> keys = new HashSet<>();
+ /**
+ * This is usually set by the test cases.
+ *
+ * <p>If a breakpoint implementation is set by this method, then code execution would break at
+ * the code execution point marked by CommonTestInjection#injectBreakpoint with matching key,
+ * executes the provided implementation in the {@link Breakpoint}, then resumes the normal code
+ * execution.
+ *
+ * @see CommonTestInjection#injectBreakpoint(String, Object...)
+ * @param key could simply be the fully qualified class name or more granular like class name +
+ * other id (such as method name). This should batch the key used in injectBreakpoint
+ * @param implementation The Breakpoint implementation
+ */
+ public void setImplementation(String key, Breakpoint implementation) {
+ if (breakpoints.containsKey(key)) {
+ throw new IllegalArgumentException(
+ "Cannot redefine Breakpoint implementation with key " + key);
+ }
+ breakpoints.put(key, implementation);
+ keys.add(key);
+ }
+
+ @Override
+ public void close() throws IOException {
+ for (String key : keys) {
+ breakpoints.remove(key);
+ }
+ }
+ }
}