You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by is...@apache.org on 2022/10/18 05:46:34 UTC
[solr] branch branch_9x updated: SOLR-16453: Overseer doesn't handle PRS and non-PRS messages properly (#1055)
This is an automated email from the ASF dual-hosted git repository.
ishan pushed a commit to branch branch_9x
in repository https://gitbox.apache.org/repos/asf/solr.git
The following commit(s) were added to refs/heads/branch_9x by this push:
new 3adf13cd0a2 SOLR-16453: Overseer doesn't handle PRS and non-PRS messages properly (#1055)
3adf13cd0a2 is described below
commit 3adf13cd0a2c51c6cd0ceebe32b7aef3d7ca8c52
Author: Ishan Chattopadhyaya <is...@apache.org>
AuthorDate: Tue Oct 18 11:12:21 2022 +0530
SOLR-16453: Overseer doesn't handle PRS and non-PRS messages properly (#1055)
closes #1055
Co-authored-by: Justin Sweeney (justinsweeney@fullstory.com)
---
solr/CHANGES.txt | 2 +
.../apache/solr/cloud/overseer/ZkStateWriter.java | 33 ++++-
.../solr/cloud/overseer/ZkStateWriterTest.java | 139 +++++++++++++++++++++
3 files changed, 169 insertions(+), 5 deletions(-)
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 9e30e56142c..300cd1d6d9f 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -213,6 +213,8 @@ Bug Fixes
* SOLR-16440: RefreshCollectionMessage in PRS should update Overseer's ZkStateWriter (noble, Ishan Chattopadhyaya)
+* SOLR-16453: Overseer doesn't handle PRS and non-PRS messages properly (Justin Sweeney, Hitesh Khamesra)
+
Other Changes
---------------------
* SOLR-16351: Upgrade Carrot2 to 4.4.3, upgrade randomizedtesting to 2.8.0. (Dawid Weiss)
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
index 44d3be57c4b..8fb0ce287fc 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
@@ -22,6 +22,7 @@ import com.codahale.metrics.Timer;
import java.lang.invoke.MethodHandles;
import java.util.HashMap;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
@@ -203,11 +204,11 @@ public class ZkStateWriter {
throws IllegalStateException, KeeperException, InterruptedException {
Map<String, ZkWriteCommand> commands = new HashMap<>();
commands.put(command.name, command);
- return writePendingUpdates(commands);
+ return writePendingUpdates(commands, false);
}
public ClusterState writePendingUpdates() throws KeeperException, InterruptedException {
- return writePendingUpdates(updates);
+ return writePendingUpdates(updates, true);
}
/**
* Writes all pending updates to ZooKeeper and returns the modified cluster state
@@ -217,13 +218,29 @@ public class ZkStateWriter {
* @throws KeeperException if any ZooKeeper operation results in an error
* @throws InterruptedException if the current thread is interrupted
*/
- public ClusterState writePendingUpdates(Map<String, ZkWriteCommand> updates)
+ public ClusterState writePendingUpdates(
+ Map<String, ZkWriteCommand> updates, boolean resetPendingUpdateCounters)
throws IllegalStateException, KeeperException, InterruptedException {
if (invalidState) {
throw new IllegalStateException(
"ZkStateWriter has seen a tragic error, this instance can no longer be used");
}
+
+ if (log.isDebugEnabled()) {
+ log.debug(
+ String.format(
+ Locale.ROOT,
+ "Request to write pending updates with updates of length: %d, "
+ + "pending updates of length: %d, writing all pending updates: %b",
+ updates.size(),
+ this.updates.size(),
+ updates == this.updates));
+ }
+
if ((updates == this.updates) && !hasPendingUpdates()) {
+ if (log.isDebugEnabled()) {
+ log.debug("Attempted to flush all pending updates, but there are no pending updates");
+ }
return clusterState;
}
Timer.Context timerContext = stats.time("update_state");
@@ -295,10 +312,11 @@ public class ZkStateWriter {
}
updates.clear();
- numUpdates = 0;
}
- lastUpdatedTime = System.nanoTime();
+ if (resetPendingUpdateCounters) {
+ resetPendingUpdateCounters();
+ }
success = true;
} catch (KeeperException.BadVersionException bve) {
// this is a tragic error, we must disallow usage of this instance
@@ -317,6 +335,11 @@ public class ZkStateWriter {
return clusterState;
}
+ public void resetPendingUpdateCounters() {
+ lastUpdatedTime = System.nanoTime();
+ numUpdates = 0;
+ }
+
/**
* @return the most up-to-date cluster state until the last enqueueUpdate operation
*/
diff --git a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java
index ca7577ab3f9..be38ec88fe6 100644
--- a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java
@@ -124,6 +124,145 @@ public class ZkStateWriterTest extends SolrTestCaseJ4 {
}
}
+ public void testZkStateWriterPendingAndNonBatchedTimeExceeded() throws Exception {
+ Path zkDir = createTempDir("testZkStateWriterBatching");
+
+ ZkTestServer server = new ZkTestServer(zkDir);
+
+ SolrZkClient zkClient = null;
+
+ try {
+ server.run();
+
+ zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
+ ZkController.createClusterZkNodes(zkClient);
+
+ try (ZkStateReader reader = new ZkStateReader(zkClient)) {
+ reader.createClusterStateWatchersAndUpdate();
+
+ zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
+ zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c2", true);
+ zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c3", true);
+ zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/prs1", true);
+
+ ZkWriteCommand c1 =
+ new ZkWriteCommand(
+ "c1",
+ new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0));
+ ZkWriteCommand c2 =
+ new ZkWriteCommand(
+ "c2",
+ new DocCollection("c2", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0));
+ ZkWriteCommand c3 =
+ new ZkWriteCommand(
+ "c3",
+ new DocCollection("c3", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0));
+ Map<String, Object> prsProps = new HashMap<>();
+ prsProps.put("perReplicaState", Boolean.TRUE);
+ ZkWriteCommand prs1 =
+ new ZkWriteCommand(
+ "prs1", new DocCollection("prs1", new HashMap<>(), prsProps, DocRouter.DEFAULT, 0));
+ ZkStateWriter writer = new ZkStateWriter(reader, new Stats());
+
+ // First write is flushed immediately
+ ClusterState clusterState =
+ writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(c1), null);
+ clusterState =
+ writer.enqueueUpdate(clusterState, Collections.singletonList(c1), FAIL_ON_WRITE);
+ clusterState =
+ writer.enqueueUpdate(clusterState, Collections.singletonList(c2), FAIL_ON_WRITE);
+
+ Thread.sleep(Overseer.STATE_UPDATE_DELAY + 100);
+ AtomicBoolean didWrite = new AtomicBoolean(false);
+ clusterState =
+ writer.enqueueUpdate(
+ clusterState, Collections.singletonList(prs1), () -> didWrite.set(true));
+ assertTrue("Exceed the update delay, should be flushed", didWrite.get());
+ didWrite.set(false);
+ clusterState =
+ writer.enqueueUpdate(
+ clusterState, Collections.singletonList(c3), () -> didWrite.set(true));
+ assertTrue("Exceed the update delay, should be flushed", didWrite.get());
+ assertTrue(
+ "The updates queue should be empty having been flushed", writer.updates.isEmpty());
+ didWrite.set(false);
+ }
+
+ } finally {
+ IOUtils.close(zkClient);
+ server.shutdown();
+ }
+ }
+
+ public void testZkStateWriterPendingAndNonBatchedBatchSizeExceeded() throws Exception {
+ Path zkDir = createTempDir("testZkStateWriterBatching");
+
+ ZkTestServer server = new ZkTestServer(zkDir);
+
+ SolrZkClient zkClient = null;
+
+ try {
+ server.run();
+
+ zkClient = new SolrZkClient(server.getZkAddress(), OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
+ ZkController.createClusterZkNodes(zkClient);
+
+ try (ZkStateReader reader = new ZkStateReader(zkClient)) {
+ reader.createClusterStateWatchersAndUpdate();
+
+ zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
+ zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c2", true);
+ zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c3", true);
+ zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/prs1", true);
+
+ ZkWriteCommand c1 =
+ new ZkWriteCommand(
+ "c1",
+ new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0));
+ ZkWriteCommand c2 =
+ new ZkWriteCommand(
+ "c2",
+ new DocCollection("c2", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0));
+ ZkWriteCommand c3 =
+ new ZkWriteCommand(
+ "c3",
+ new DocCollection("c3", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0));
+ Map<String, Object> prsProps = new HashMap<>();
+ prsProps.put("perReplicaState", Boolean.TRUE);
+ ZkWriteCommand prs1 =
+ new ZkWriteCommand(
+ "prs1", new DocCollection("prs1", new HashMap<>(), prsProps, DocRouter.DEFAULT, 0));
+ ZkStateWriter writer = new ZkStateWriter(reader, new Stats());
+
+ // First write is flushed immediately
+ ClusterState clusterState =
+ writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(c1), null);
+
+ AtomicBoolean didWrite = new AtomicBoolean(false);
+ AtomicBoolean didWritePrs = new AtomicBoolean(false);
+ for (int i = 0; i <= Overseer.STATE_UPDATE_BATCH_SIZE; i++) {
+ clusterState =
+ writer.enqueueUpdate(
+ clusterState, Collections.singletonList(c3), () -> didWrite.set(true));
+ // Write a PRS update in the middle and make sure we still get the right results
+ if (i == (Overseer.STATE_UPDATE_BATCH_SIZE / 2)) {
+ clusterState =
+ writer.enqueueUpdate(
+ clusterState, Collections.singletonList(prs1), () -> didWritePrs.set(true));
+ }
+ }
+ assertTrue("Exceed the update batch size, should be flushed", didWrite.get());
+ assertTrue("PRS update should always be written", didWritePrs.get());
+ assertTrue(
+ "The updates queue should be empty having been flushed", writer.updates.isEmpty());
+ }
+
+ } finally {
+ IOUtils.close(zkClient);
+ server.shutdown();
+ }
+ }
+
public void testSingleExternalCollection() throws Exception {
Path zkDir = createTempDir("testSingleExternalCollection");