You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by ma...@apache.org on 2023/09/23 00:46:58 UTC

[solr] branch branch_9x updated: SOLR-16701: Race condition on PRS enabled collection deletion (#1460)

This is an automated email from the ASF dual-hosted git repository.

magibney pushed a commit to branch branch_9x
in repository https://gitbox.apache.org/repos/asf/solr.git


The following commit(s) were added to refs/heads/branch_9x by this push:
     new f1ba2c2cb80 SOLR-16701: Race condition on PRS enabled collection deletion (#1460)
f1ba2c2cb80 is described below

commit f1ba2c2cb8050ad958326141d86e573a28ab0516
Author: patsonluk <pa...@users.noreply.github.com>
AuthorDate: Fri Sep 22 17:21:46 2023 -0700

    SOLR-16701: Race condition on PRS enabled collection deletion (#1460)
    
    (cherry picked from commit f22a51cc64f83f7b1268d9f3a4c50e36249bdd87)
---
 solr/CHANGES.txt                                   |   2 +
 .../solr/cloud/overseer/ZkStateReaderTest.java     | 120 +++++++++++++++++++++
 .../solr/common/cloud/PerReplicaStatesOps.java     |  14 +++
 .../apache/solr/common/cloud/ZkStateReader.java    |  15 +++
 .../solr/common/util/CommonTestInjection.java      |  96 +++++++++++++++++
 5 files changed, 247 insertions(+)

diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 9978cea69ff..ba6d4cf9855 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -147,6 +147,8 @@ Bug Fixes
 
 * SOLR-16925: Fix indentation for JacksonJsonWriter (Houston Putman)
 
+* SOLR-16701: Fix race condition on PRS enabled collection deletion (Patson Luk)
+
 Dependency Upgrades
 ---------------------
 
diff --git a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java
index 6e1f4221943..3e1ca33963c 100644
--- a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java
@@ -43,16 +43,19 @@ import org.apache.solr.common.cloud.DocRouter;
 import org.apache.solr.common.cloud.PerReplicaStates;
 import org.apache.solr.common.cloud.PerReplicaStatesOps;
 import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.util.CommonTestInjection;
 import org.apache.solr.common.util.ExecutorUtil;
 import org.apache.solr.common.util.SolrNamedThreadFactory;
 import org.apache.solr.common.util.TimeSource;
+import org.apache.solr.common.util.Utils;
 import org.apache.solr.common.util.ZLibCompressor;
 import org.apache.solr.handler.admin.ConfigSetsHandler;
 import org.apache.solr.util.LogLevel;
 import org.apache.solr.util.TimeOut;
+import org.apache.zookeeper.KeeperException;
 import org.junit.After;
 import org.junit.Before;
 import org.slf4j.Logger;
@@ -664,4 +667,121 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
       ExecutorUtil.awaitTermination(executorService);
     }
   }
+
+  /**
+   * Ensure that collection state fetching (getCollectionLive etc.) would not throw exception when
+   * the state.json is deleted in between the state.json read and PRS entries read
+   */
+  public void testDeletePrsCollection() throws Exception {
+    ZkStateWriter writer = fixture.writer;
+    ZkStateReader reader = fixture.reader;
+
+    String collectionName = "c1";
+    fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/" + collectionName, true);
+
+    ClusterState clusterState = reader.getClusterState();
+
+    String nodeName = "node1:10000_solr";
+    String sliceName = "shard1";
+    Slice slice = new Slice(sliceName, Map.of(), Map.of(), collectionName);
+
+    // create new collection
+    DocCollection state =
+        DocCollection.create(
+            collectionName,
+            Map.of(sliceName, slice),
+            Collections.singletonMap(DocCollection.CollectionStateProps.PER_REPLICA_STATE, true),
+            DocRouter.DEFAULT,
+            0,
+            PerReplicaStatesOps.getZkClientPrsSupplier(
+                fixture.zkClient, DocCollection.getCollectionPath(collectionName)));
+    ZkWriteCommand wc = new ZkWriteCommand(collectionName, state);
+    writer.enqueueUpdate(clusterState, Collections.singletonList(wc), null);
+    clusterState = writer.writePendingUpdates();
+
+    TimeOut timeOut = new TimeOut(5000, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME);
+    timeOut.waitFor(
+        "Timeout on waiting for c1 to show up in cluster state",
+        () -> reader.getClusterState().getCollectionOrNull(collectionName) != null);
+
+    String collectionPath = ZkStateReader.getCollectionPath(collectionName);
+
+    // now create the replica, take note that this has to be done after DocCollection creation with
+    // empty slice, otherwise the DocCollection ctor would fetch the PRS entries and throw
+    // exceptions
+    String replicaBaseUrl = Utils.getBaseUrlForNodeName(nodeName, "http");
+
+    String replicaName = "replica1";
+    Replica replica =
+        new Replica(
+            replicaName,
+            Map.of(
+                ZkStateReader.CORE_NAME_PROP,
+                "core1",
+                ZkStateReader.STATE_PROP,
+                Replica.State.ACTIVE.toString(),
+                ZkStateReader.NODE_NAME_PROP,
+                nodeName,
+                ZkStateReader.BASE_URL_PROP,
+                replicaBaseUrl,
+                ZkStateReader.REPLICA_TYPE,
+                Replica.Type.NRT.name()),
+            collectionName,
+            sliceName);
+
+    wc =
+        new ZkWriteCommand(
+            collectionName, SliceMutator.updateReplica(state, slice, replica.getName(), replica));
+    writer.enqueueUpdate(clusterState, Collections.singletonList(wc), null);
+    clusterState = writer.writePendingUpdates();
+
+    timeOut.waitFor(
+        "Timeout on waiting for replica to show up in cluster state",
+        () ->
+            reader.getCollectionLive(collectionName).getSlice(sliceName).getReplica(replicaName)
+                != null);
+
+    try (CommonTestInjection.BreakpointSetter breakpointSetter =
+        new CommonTestInjection.BreakpointSetter()) {
+      // set breakpoint such that after state.json fetch and before PRS entry fetch, we can delete
+      // the state.json and PRS entries to trigger the race condition
+      breakpointSetter.setImplementation(
+          PerReplicaStatesOps.class.getName() + "/beforePrsFetch",
+          (args) -> {
+            try {
+              // this is invoked after ZkStateReader.fetchCollectionState has fetched the state.json
+              // but before PRS entries.
+              // call delete state.json on ZK directly, very tricky to control execution order with
+              // writer.enqueueUpdate
+              reader.getZkClient().clean(collectionPath);
+            } catch (InterruptedException e) {
+              throw new RuntimeException(e);
+            } catch (KeeperException e) {
+              throw new RuntimeException(e);
+            }
+          });
+
+      // set breakpoint to verify the expected PrsZkNodeNotFoundException is indeed thrown within
+      // the execution flow, such exception is caught within the logic and not thrown to the
+      // caller
+      AtomicBoolean prsZkNodeNotFoundExceptionThrown = new AtomicBoolean(false);
+      breakpointSetter.setImplementation(
+          ZkStateReader.class.getName() + "/exercised",
+          (args) -> {
+            if (args[0] instanceof PerReplicaStatesOps.PrsZkNodeNotFoundException) {
+              prsZkNodeNotFoundExceptionThrown.set(true);
+            }
+          });
+
+      timeOut.waitFor(
+          "Timeout waiting for collection state to become null",
+          () -> {
+            // this should not throw exception even if the PRS entry read is delayed artificially
+            // (by previous command) and deleted after the following getCollectionLive call
+            return reader.getCollectionLive(collectionName) == null;
+          });
+
+      assertTrue(prsZkNodeNotFoundExceptionThrown.get());
+    }
+  }
 }
diff --git a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/PerReplicaStatesOps.java b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/PerReplicaStatesOps.java
index 475b852e75c..dd0b14fd3e1 100644
--- a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/PerReplicaStatesOps.java
+++ b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/PerReplicaStatesOps.java
@@ -27,6 +27,7 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.function.Function;
 import org.apache.solr.common.SolrException;
+import org.apache.solr.common.util.CommonTestInjection;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.Op;
@@ -56,6 +57,8 @@ public class PerReplicaStatesOps {
   public static PerReplicaStates fetch(
       String path, SolrZkClient zkClient, PerReplicaStates current) {
     try {
+      assert CommonTestInjection.injectBreakpoint(
+          PerReplicaStatesOps.class.getName() + "/beforePrsFetch");
       if (current != null) {
         Stat stat = zkClient.exists(current.path, null, true);
         if (stat == null) return new PerReplicaStates(path, 0, Collections.emptyList());
@@ -64,6 +67,11 @@ public class PerReplicaStatesOps {
       Stat stat = new Stat();
       List<String> children = zkClient.getChildren(path, null, stat, true);
       return new PerReplicaStates(path, stat.getCversion(), Collections.unmodifiableList(children));
+    } catch (KeeperException.NoNodeException e) {
+      throw new PrsZkNodeNotFoundException(
+          SolrException.ErrorCode.SERVER_ERROR,
+          "Error fetching per-replica states. The node [" + path + "] is not found",
+          e);
     } catch (KeeperException e) {
       throw new SolrException(
           SolrException.ErrorCode.SERVER_ERROR, "Error fetching per-replica states", e);
@@ -76,6 +84,12 @@ public class PerReplicaStatesOps {
     }
   }
 
+  public static class PrsZkNodeNotFoundException extends SolrException {
+    private PrsZkNodeNotFoundException(ErrorCode code, String msg, Throwable cause) {
+      super(code, msg, cause);
+    }
+  }
+
   public static DocCollection.PrsSupplier getZkClientPrsSupplier(
       SolrZkClient zkClient, String collectionPath) {
     return () -> fetch(collectionPath, zkClient, null);
diff --git a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index d8115fdf1b8..17f7bdbc5d5 100644
--- a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -1627,6 +1627,21 @@ public class ZkStateReader implements SolrCloseable {
           }
         }
         return null;
+      } catch (PerReplicaStatesOps.PrsZkNodeNotFoundException e) {
+        assert CommonTestInjection.injectBreakpoint(
+            ZkStateReader.class.getName() + "/exercised", e);
+        // could be a race condition that state.json and PRS entries are deleted between the
+        // state.json fetch and PRS entry fetch
+        Stat exists = zkClient.exists(collectionPath, watcher, true);
+        if (exists == null) {
+          log.info(
+              "PRS entry for collection {} not found in ZK. It was probably deleted between state.json read and PRS entry read.",
+              coll);
+
+          return null;
+        } else {
+          throw e; // unexpected, PRS node not found but the collection state.json still exists
+        }
       }
     }
   }
diff --git a/solr/solrj/src/java/org/apache/solr/common/util/CommonTestInjection.java b/solr/solrj/src/java/org/apache/solr/common/util/CommonTestInjection.java
index e1d7caa8dd9..bcacd715a71 100644
--- a/solr/solrj/src/java/org/apache/solr/common/util/CommonTestInjection.java
+++ b/solr/solrj/src/java/org/apache/solr/common/util/CommonTestInjection.java
@@ -17,8 +17,14 @@
 
 package org.apache.solr.common.util;
 
+import java.io.Closeable;
+import java.io.IOException;
 import java.lang.invoke.MethodHandles;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -32,6 +38,7 @@ public class CommonTestInjection {
 
   private static volatile Map<String, String> additionalSystemProps = null;
   private static volatile Integer delay = null;
+  private static final ConcurrentMap<String, Breakpoint> breakpoints = new ConcurrentHashMap<>();
 
   public static void reset() {
     additionalSystemProps = null;
@@ -73,4 +80,93 @@ public class CommonTestInjection {
     }
     return true;
   }
+
+  /**
+   * Injects a breakpoint that pauses the existing code execution, executes the code defined in the
+   * breakpoint implementation and then resumes afterward. The breakpoint implementation is looked
+   * up by the corresponding key used in {@link BreakpointSetter#setImplementation(String,
+   * Breakpoint)}
+   *
+   * <p>An example usages :
+   *
+   * <ol>
+   *   <li>Inject a precise wait until a race condition is fulfilled before proceeding with original
+   *       code execution
+   *   <li>Inject a flag to catch exception statement which handles the exception without
+   *       re-throwing. This could verify caught exception does get triggered
+   * </ol>
+   *
+   * <p>This should always be a part of an assert statement (ie assert injectBreakpoint(key)) such
+   * that it will be skipped for normal code execution
+   *
+   * @see BreakpointSetter#setImplementation(String, Breakpoint)
+   * @param key could simply be the fully qualified class name or more granular like class name +
+   *     other id (such as method name). This should only be set by corresponding unit test cases
+   *     with CommonTestInjection#setBreakpoint
+   * @param args optional arguments list to be passed to the Breakpoint
+   */
+  public static boolean injectBreakpoint(String key, Object... args) {
+    Breakpoint breakpoint = breakpoints.get(key);
+    if (breakpoint != null) {
+      log.info("Breakpoint with key {} is triggered", key);
+      breakpoint.executeAndResume(args);
+      log.info("Breakpoint with key {} was executed and normal code execution resumes", key);
+    } else {
+      log.debug(
+          "Breakpoint with key {} is triggered but there's no implementation set. Skipping...",
+          key);
+    }
+    return true;
+  }
+
+  public interface Breakpoint {
+    /**
+     * Code execution should break at where the breakpoint was injected, then it would execute this
+     * method and resumes the execution afterward.
+     */
+    void executeAndResume(Object... args);
+  }
+
+  /**
+   * Breakpoints should be set via this {@link BreakpointSetter} within the test case and close
+   * should be invoked as cleanup. Since this is closeable, it should usually be used in the
+   * try-with-resource syntax, such as:
+   *
+   * <pre>{@code
+   * try (BreakpointSetter breakpointSetter = new BreakpointSetter() {
+   *     //... test code here that calls breakpointSetter.setImplementation(...)
+   * }
+   * }</pre>
+   */
+  public static class BreakpointSetter implements Closeable {
+    private Set<String> keys = new HashSet<>();
+    /**
+     * This is usually set by the test cases.
+     *
+     * <p>If a breakpoint implementation is set by this method, then code execution would break at
+     * the code execution point marked by CommonTestInjection#injectBreakpoint with matching key,
+     * executes the provided implementation in the {@link Breakpoint}, then resumes the normal code
+     * execution.
+     *
+     * @see CommonTestInjection#injectBreakpoint(String, Object...)
+     * @param key could simply be the fully qualified class name or more granular like class name +
+     *     other id (such as method name). This should batch the key used in injectBreakpoint
+     * @param implementation The Breakpoint implementation
+     */
+    public void setImplementation(String key, Breakpoint implementation) {
+      if (breakpoints.containsKey(key)) {
+        throw new IllegalArgumentException(
+            "Cannot redefine Breakpoint implementation with key " + key);
+      }
+      breakpoints.put(key, implementation);
+      keys.add(key);
+    }
+
+    @Override
+    public void close() throws IOException {
+      for (String key : keys) {
+        breakpoints.remove(key);
+      }
+    }
+  }
 }