You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by is...@apache.org on 2022/10/18 05:46:34 UTC

[solr] branch branch_9x updated: SOLR-16453: Overseer doesn't handle PRS and non-PRS messages properly (#1055)

This is an automated email from the ASF dual-hosted git repository.

ishan pushed a commit to branch branch_9x
in repository https://gitbox.apache.org/repos/asf/solr.git


The following commit(s) were added to refs/heads/branch_9x by this push:
     new 3adf13cd0a2 SOLR-16453: Overseer doesn't handle PRS and non-PRS messages properly (#1055)
3adf13cd0a2 is described below

commit 3adf13cd0a2c51c6cd0ceebe32b7aef3d7ca8c52
Author: Ishan Chattopadhyaya <is...@apache.org>
AuthorDate: Tue Oct 18 11:12:21 2022 +0530

    SOLR-16453: Overseer doesn't handle PRS and non-PRS messages properly (#1055)
    
    closes #1055
    
    Co-authored-by: Justin Sweeney (justinsweeney@fullstory.com)
---
 solr/CHANGES.txt                                   |   2 +
 .../apache/solr/cloud/overseer/ZkStateWriter.java  |  33 ++++-
 .../solr/cloud/overseer/ZkStateWriterTest.java     | 139 +++++++++++++++++++++
 3 files changed, 169 insertions(+), 5 deletions(-)

diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 9e30e56142c..300cd1d6d9f 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -213,6 +213,8 @@ Bug Fixes
 
 * SOLR-16440: RefreshCollectionMessage in PRS should update Overseer's ZkStateWriter (noble, Ishan Chattopadhyaya)
 
+* SOLR-16453: Overseer doesn't handle PRS and non-PRS messages properly (Justin Sweeney, Hitesh Khamesra)
+
 Other Changes
 ---------------------
 * SOLR-16351: Upgrade Carrot2 to 4.4.3, upgrade randomizedtesting to 2.8.0. (Dawid Weiss)
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
index 44d3be57c4b..8fb0ce287fc 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
@@ -22,6 +22,7 @@ import com.codahale.metrics.Timer;
 import java.lang.invoke.MethodHandles;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
@@ -203,11 +204,11 @@ public class ZkStateWriter {
       throws IllegalStateException, KeeperException, InterruptedException {
     Map<String, ZkWriteCommand> commands = new HashMap<>();
     commands.put(command.name, command);
-    return writePendingUpdates(commands);
+    return writePendingUpdates(commands, false);
   }
 
   public ClusterState writePendingUpdates() throws KeeperException, InterruptedException {
-    return writePendingUpdates(updates);
+    return writePendingUpdates(updates, true);
   }
   /**
    * Writes all pending updates to ZooKeeper and returns the modified cluster state
@@ -217,13 +218,29 @@ public class ZkStateWriter {
    * @throws KeeperException if any ZooKeeper operation results in an error
    * @throws InterruptedException if the current thread is interrupted
    */
-  public ClusterState writePendingUpdates(Map<String, ZkWriteCommand> updates)
+  public ClusterState writePendingUpdates(
+      Map<String, ZkWriteCommand> updates, boolean resetPendingUpdateCounters)
       throws IllegalStateException, KeeperException, InterruptedException {
     if (invalidState) {
       throw new IllegalStateException(
           "ZkStateWriter has seen a tragic error, this instance can no longer be used");
     }
+
+    if (log.isDebugEnabled()) {
+      log.debug(
+          String.format(
+              Locale.ROOT,
+              "Request to write pending updates with updates of length: %d, "
+                  + "pending updates of length: %d, writing all pending updates: %b",
+              updates.size(),
+              this.updates.size(),
+              updates == this.updates));
+    }
+
     if ((updates == this.updates) && !hasPendingUpdates()) {
+      if (log.isDebugEnabled()) {
+        log.debug("Attempted to flush all pending updates, but there are no pending updates");
+      }
       return clusterState;
     }
     Timer.Context timerContext = stats.time("update_state");
@@ -295,10 +312,11 @@ public class ZkStateWriter {
         }
 
         updates.clear();
-        numUpdates = 0;
       }
 
-      lastUpdatedTime = System.nanoTime();
+      if (resetPendingUpdateCounters) {
+        resetPendingUpdateCounters();
+      }
       success = true;
     } catch (KeeperException.BadVersionException bve) {
       // this is a tragic error, we must disallow usage of this instance
@@ -317,6 +335,11 @@ public class ZkStateWriter {
     return clusterState;
   }
 
+  public void resetPendingUpdateCounters() {
+    lastUpdatedTime = System.nanoTime();
+    numUpdates = 0;
+  }
+
   /**
    * @return the most up-to-date cluster state until the last enqueueUpdate operation
    */
diff --git a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java
index ca7577ab3f9..be38ec88fe6 100644
--- a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java
@@ -124,6 +124,145 @@ public class ZkStateWriterTest extends SolrTestCaseJ4 {
     }
   }
 
+  public void testZkStateWriterPendingAndNonBatchedTimeExceeded() throws Exception {
+    Path zkDir = createTempDir("testZkStateWriterBatching");
+
+    ZkTestServer server = new ZkTestServer(zkDir);
+
+    SolrZkClient zkClient = null;
+
+    try {
+      server.run();
+
+      zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
+      ZkController.createClusterZkNodes(zkClient);
+
+      try (ZkStateReader reader = new ZkStateReader(zkClient)) {
+        reader.createClusterStateWatchersAndUpdate();
+
+        zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
+        zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c2", true);
+        zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c3", true);
+        zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/prs1", true);
+
+        ZkWriteCommand c1 =
+            new ZkWriteCommand(
+                "c1",
+                new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0));
+        ZkWriteCommand c2 =
+            new ZkWriteCommand(
+                "c2",
+                new DocCollection("c2", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0));
+        ZkWriteCommand c3 =
+            new ZkWriteCommand(
+                "c3",
+                new DocCollection("c3", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0));
+        Map<String, Object> prsProps = new HashMap<>();
+        prsProps.put("perReplicaState", Boolean.TRUE);
+        ZkWriteCommand prs1 =
+            new ZkWriteCommand(
+                "prs1", new DocCollection("prs1", new HashMap<>(), prsProps, DocRouter.DEFAULT, 0));
+        ZkStateWriter writer = new ZkStateWriter(reader, new Stats());
+
+        // First write is flushed immediately
+        ClusterState clusterState =
+            writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(c1), null);
+        clusterState =
+            writer.enqueueUpdate(clusterState, Collections.singletonList(c1), FAIL_ON_WRITE);
+        clusterState =
+            writer.enqueueUpdate(clusterState, Collections.singletonList(c2), FAIL_ON_WRITE);
+
+        Thread.sleep(Overseer.STATE_UPDATE_DELAY + 100);
+        AtomicBoolean didWrite = new AtomicBoolean(false);
+        clusterState =
+            writer.enqueueUpdate(
+                clusterState, Collections.singletonList(prs1), () -> didWrite.set(true));
+        assertTrue("Exceed the update delay, should be flushed", didWrite.get());
+        didWrite.set(false);
+        clusterState =
+            writer.enqueueUpdate(
+                clusterState, Collections.singletonList(c3), () -> didWrite.set(true));
+        assertTrue("Exceed the update delay, should be flushed", didWrite.get());
+        assertTrue(
+            "The updates queue should be empty having been flushed", writer.updates.isEmpty());
+        didWrite.set(false);
+      }
+
+    } finally {
+      IOUtils.close(zkClient);
+      server.shutdown();
+    }
+  }
+
+  public void testZkStateWriterPendingAndNonBatchedBatchSizeExceeded() throws Exception {
+    Path zkDir = createTempDir("testZkStateWriterBatching");
+
+    ZkTestServer server = new ZkTestServer(zkDir);
+
+    SolrZkClient zkClient = null;
+
+    try {
+      server.run();
+
+      zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
+      ZkController.createClusterZkNodes(zkClient);
+
+      try (ZkStateReader reader = new ZkStateReader(zkClient)) {
+        reader.createClusterStateWatchersAndUpdate();
+
+        zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
+        zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c2", true);
+        zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c3", true);
+        zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/prs1", true);
+
+        ZkWriteCommand c1 =
+            new ZkWriteCommand(
+                "c1",
+                new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0));
+        ZkWriteCommand c2 =
+            new ZkWriteCommand(
+                "c2",
+                new DocCollection("c2", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0));
+        ZkWriteCommand c3 =
+            new ZkWriteCommand(
+                "c3",
+                new DocCollection("c3", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0));
+        Map<String, Object> prsProps = new HashMap<>();
+        prsProps.put("perReplicaState", Boolean.TRUE);
+        ZkWriteCommand prs1 =
+            new ZkWriteCommand(
+                "prs1", new DocCollection("prs1", new HashMap<>(), prsProps, DocRouter.DEFAULT, 0));
+        ZkStateWriter writer = new ZkStateWriter(reader, new Stats());
+
+        // First write is flushed immediately
+        ClusterState clusterState =
+            writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(c1), null);
+
+        AtomicBoolean didWrite = new AtomicBoolean(false);
+        AtomicBoolean didWritePrs = new AtomicBoolean(false);
+        for (int i = 0; i <= Overseer.STATE_UPDATE_BATCH_SIZE; i++) {
+          clusterState =
+              writer.enqueueUpdate(
+                  clusterState, Collections.singletonList(c3), () -> didWrite.set(true));
+          // Write a PRS update in the middle and make sure we still get the right results
+          if (i == (Overseer.STATE_UPDATE_BATCH_SIZE / 2)) {
+            clusterState =
+                writer.enqueueUpdate(
+                    clusterState, Collections.singletonList(prs1), () -> didWritePrs.set(true));
+          }
+        }
+        assertTrue("Exceed the update batch size, should be flushed", didWrite.get());
+        assertTrue("PRS update should always be written", didWritePrs.get());
+        assertTrue(
+            "The updates queue should be empty having been flushed", writer.updates.isEmpty());
+      }
+
+    } finally {
+      IOUtils.close(zkClient);
+      server.shutdown();
+    }
+  }
+
   public void testSingleExternalCollection() throws Exception {
     Path zkDir = createTempDir("testSingleExternalCollection");