You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by no...@apache.org on 2022/10/20 02:18:21 UTC
[solr] branch branch_9x updated: SOLR-16456: ZkNodeProps to implement MapWriter and DistributedQueue support MapWriter (#1072)
This is an automated email from the ASF dual-hosted git repository.
noble pushed a commit to branch branch_9x
in repository https://gitbox.apache.org/repos/asf/solr.git
The following commit(s) were added to refs/heads/branch_9x by this push:
new f1d3f9a9020 SOLR-16456: ZkNodeProps to implement MapWriter and DistributedQueue support MapWriter (#1072)
f1d3f9a9020 is described below
commit f1d3f9a9020d719f7af8ec0ce799730ccf5893d9
Author: Noble Paul <no...@users.noreply.github.com>
AuthorDate: Thu Oct 20 13:16:39 2022 +1100
SOLR-16456: ZkNodeProps to implement MapWriter and DistributedQueue support MapWriter (#1072)
---
.../src/java/org/apache/solr/cloud/Overseer.java | 10 +++++++---
.../solr/cloud/ShardLeaderElectionContext.java | 3 +--
.../solr/cloud/ShardLeaderElectionContextBase.java | 2 +-
.../java/org/apache/solr/cloud/ZkController.java | 19 +++++++++---------
.../org/apache/solr/cloud/ZkDistributedQueue.java | 6 ++++++
.../solr/cloud/api/collections/AddReplicaCmd.java | 2 +-
.../solr/cloud/api/collections/CollApiCmds.java | 6 +++---
.../api/collections/CollectionCommandContext.java | 6 ++++++
.../api/collections/CollectionHandlingUtils.java | 2 +-
.../solr/cloud/api/collections/CreateShardCmd.java | 3 +--
.../cloud/api/collections/DeleteCollectionCmd.java | 3 +--
.../solr/cloud/api/collections/DeleteShardCmd.java | 5 ++---
.../solr/cloud/api/collections/MigrateCmd.java | 2 +-
.../collections/OcmhCollectionCommandContext.java | 5 +++++
.../api/collections/ReindexCollectionCmd.java | 8 ++++----
.../solr/cloud/api/collections/RestoreCmd.java | 15 +++++++-------
.../solr/cloud/api/collections/SplitShardCmd.java | 23 ++++++++++++----------
.../solr/handler/admin/CollectionsHandler.java | 2 +-
.../org/apache/solr/cloud/DeleteReplicaTest.java | 5 ++---
.../OverseerCollectionConfigSetProcessorTest.java | 4 ++--
.../org/apache/solr/common/cloud/ZkNodeProps.java | 12 ++++++++++-
21 files changed, 86 insertions(+), 57 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index d0c615ab2db..2d4d0d7cf7a 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -50,6 +50,7 @@ import org.apache.solr.cloud.overseer.SliceMutator;
import org.apache.solr.cloud.overseer.ZkStateWriter;
import org.apache.solr.cloud.overseer.ZkWriteCommand;
import org.apache.solr.common.AlreadyClosedException;
+import org.apache.solr.common.MapWriter;
import org.apache.solr.common.SolrCloseable;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
@@ -1171,6 +1172,10 @@ public class Overseer implements SolrCloseable {
return reader;
}
+ public void offerStateUpdate(MapWriter mw) throws KeeperException, InterruptedException {
+ offerStateUpdate(Utils.toJSON(mw));
+ }
+
public void offerStateUpdate(byte[] data) throws KeeperException, InterruptedException {
// When cluster state update is distributed, the Overseer cluster state update queue should only
// ever receive QUIT messages. These go to sendQuitToOverseer for execution path clarity.
@@ -1215,8 +1220,7 @@ public class Overseer implements SolrCloseable {
public void sendQuitToOverseer(String overseerId) throws KeeperException, InterruptedException {
getOverseerQuitNotificationQueue()
.offer(
- Utils.toJSON(
- new ZkNodeProps(
- Overseer.QUEUE_OPERATION, OverseerAction.QUIT.toLower(), ID, overseerId)));
+ new ZkNodeProps(
+ Overseer.QUEUE_OPERATION, OverseerAction.QUIT.toLower(), ID, overseerId));
}
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java
index 151c60d3d25..994e0e12607 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java
@@ -32,7 +32,6 @@ import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.util.Utils;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.SolrCore;
import org.apache.solr.logging.MDCLoggingContext;
@@ -145,7 +144,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
zkController.getSolrCloudManager(),
zkStateReader);
} else {
- zkController.getOverseer().getStateUpdateQueue().offer(Utils.toJSON(m));
+ zkController.getOverseer().getStateUpdateQueue().offer(m);
}
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java
index 58bffbdbd57..6288a6e39d5 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java
@@ -236,7 +236,7 @@ class ShardLeaderElectionContextBase extends ElectionContext {
zkController.getSolrCloudManager(),
zkStateReader);
} else {
- zkController.getOverseer().offerStateUpdate(Utils.toJSON(m));
+ zkController.getOverseer().offerStateUpdate(m);
}
}
if (coll != null && coll.isPerReplicaState()) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index e5e7196dcb2..0423cce0896 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -1779,7 +1779,7 @@ public class ZkController implements Closeable {
getSolrCloudManager(),
zkStateReader);
} else {
- overseerJobQueue.offer(Utils.toJSON(m));
+ overseerJobQueue.offer(m);
}
}
// extra handling for PRS, we need to write the PRS entries from this node directly,
@@ -1882,7 +1882,7 @@ public class ZkController implements Closeable {
getSolrCloudManager(),
zkStateReader);
} else {
- overseerJobQueue.offer(Utils.toJSON(m));
+ overseerJobQueue.offer(m);
}
}
}
@@ -2509,7 +2509,7 @@ public class ZkController implements Closeable {
log.warn(
"Going to add role {}. It is deprecated to use ADDROLE and consider using Node Roles instead.",
props);
- getOverseerCollectionQueue().offer(Utils.toJSON(props));
+ getOverseerCollectionQueue().offer(props);
}
public CoreContainer getCoreContainer() {
@@ -2960,13 +2960,12 @@ public class ZkController implements Closeable {
// We always send a down node event to overseer to be safe, but overseer will not need to do
// anything for PRS collections
- ZkNodeProps m =
- new ZkNodeProps(
- Overseer.QUEUE_OPERATION,
- OverseerAction.DOWNNODE.toLower(),
- ZkStateReader.NODE_NAME_PROP,
- nodeName);
- overseer.getStateUpdateQueue().offer(Utils.toJSON(m));
+ overseer
+ .getStateUpdateQueue()
+ .offer(
+ m ->
+ m.put(Overseer.QUEUE_OPERATION, OverseerAction.DOWNNODE.toLower())
+ .put(ZkStateReader.NODE_NAME_PROP, nodeName));
} catch (AlreadyClosedException e) {
log.info(
"Not publishing node as DOWN because a resource required to do so is already closed.");
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java b/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java
index dcb53a78431..5bb27867c62 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java
@@ -34,12 +34,14 @@ import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Predicate;
import org.apache.solr.client.solrj.cloud.DistributedQueue;
+import org.apache.solr.common.MapWriter;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.ConnectionManager.IsClosed;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkCmdExecutor;
import org.apache.solr.common.util.Pair;
+import org.apache.solr.common.util.Utils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Op;
@@ -307,6 +309,10 @@ public class ZkDistributedQueue implements DistributedQueue {
}
}
+ public void offer(MapWriter mw) throws KeeperException, InterruptedException {
+ offer(Utils.toJSON(mw));
+ }
+
/**
* Inserts data into queue. If there are no other queue consumers, the offered element will be
* immediately visible when this method returns.
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java
index 1df96cd2986..cd6adf6f687 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/AddReplicaCmd.java
@@ -285,7 +285,7 @@ public class AddReplicaCmd implements CollApiCmds.CollectionApiCommand {
ccc.getZkStateReader());
} else {
try {
- ccc.offerStateUpdate(Utils.toJSON(props));
+ ccc.offerStateUpdate(props);
} catch (Exception e) {
throw new SolrException(
SolrException.ErrorCode.SERVER_ERROR, "Exception updating Overseer state queue", e);
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CollApiCmds.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CollApiCmds.java
index 2ebe6b5909d..4b75947c227 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CollApiCmds.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CollApiCmds.java
@@ -296,7 +296,7 @@ public class CollApiCmds {
ccc.getSolrCloudManager(),
ccc.getZkStateReader());
} else {
- ccc.offerStateUpdate(Utils.toJSON(m));
+ ccc.offerStateUpdate(m);
}
}
}
@@ -324,7 +324,7 @@ public class CollApiCmds {
ccc.getSolrCloudManager(),
ccc.getZkStateReader());
} else {
- ccc.offerStateUpdate(Utils.toJSON(m));
+ ccc.offerStateUpdate(m);
}
}
}
@@ -403,7 +403,7 @@ public class CollApiCmds {
ccc.getSolrCloudManager(),
ccc.getZkStateReader());
} else {
- ccc.offerStateUpdate(Utils.toJSON(message));
+ ccc.offerStateUpdate(message);
}
try {
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CollectionCommandContext.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CollectionCommandContext.java
index 8f0ce6aded1..f167d4dc6b4 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CollectionCommandContext.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CollectionCommandContext.java
@@ -23,9 +23,11 @@ import org.apache.solr.cloud.DistributedClusterStateUpdater;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.Stats;
import org.apache.solr.cloud.api.collections.CollectionHandlingUtils.ShardRequestTracker;
+import org.apache.solr.common.MapWriter;
import org.apache.solr.common.SolrCloseable;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.util.Utils;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.handler.component.ShardHandler;
import org.apache.zookeeper.KeeperException;
@@ -78,6 +80,10 @@ public interface CollectionCommandContext {
"Bug! offerStateUpdate() should not be called when distributed cluster state updates are enabled");
}
+ default void offerStateUpdate(MapWriter data) throws KeeperException, InterruptedException {
+ offerStateUpdate(Utils.toJSON(data));
+ }
+
default String getOverseerId() {
throw new IllegalStateException(
"Bug! getOverseerId() default implementation should never be called");
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CollectionHandlingUtils.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CollectionHandlingUtils.java
index 68a8baceb15..79f9fa9ef98 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CollectionHandlingUtils.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CollectionHandlingUtils.java
@@ -184,7 +184,7 @@ public class CollectionHandlingUtils {
ccc.getSolrCloudManager(),
ccc.getZkStateReader());
} else {
- ccc.offerStateUpdate(Utils.toJSON(m));
+ ccc.offerStateUpdate(m);
}
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java
index 78c77b44e4f..f0878e39df6 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateShardCmd.java
@@ -37,7 +37,6 @@ import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CommonAdminParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
-import org.apache.solr.common.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -100,7 +99,7 @@ public class CreateShardCmd implements CollApiCmds.CollectionApiCommand {
} else {
// message contains extCollectionName that might be an alias. Unclear (to me) how this works
// in that case.
- ccc.offerStateUpdate(Utils.toJSON(message));
+ ccc.offerStateUpdate(message);
}
// wait for a while until we see the shard and update the local view of the cluster state
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
index d1eeb1497c3..ac690993c7d 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteCollectionCmd.java
@@ -45,7 +45,6 @@ import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
-import org.apache.solr.common.util.Utils;
import org.apache.solr.core.snapshots.SolrSnapshotManager;
import org.apache.solr.handler.admin.ConfigSetsHandler;
import org.apache.zookeeper.KeeperException;
@@ -149,7 +148,7 @@ public class DeleteCollectionCmd implements CollApiCmds.CollectionApiCommand {
ccc.getSolrCloudManager(),
ccc.getZkStateReader());
} else {
- ccc.offerStateUpdate(Utils.toJSON(m));
+ ccc.offerStateUpdate(m);
}
// wait for a while until we don't see the collection
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java
index 2d0a51e1268..72c93c02c62 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/DeleteShardCmd.java
@@ -43,7 +43,6 @@ import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
-import org.apache.solr.common.util.Utils;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -121,7 +120,7 @@ public class DeleteShardCmd implements CollApiCmds.CollectionApiCommand {
ccc.getSolrCloudManager(),
ccc.getZkStateReader());
} else {
- ccc.offerStateUpdate(Utils.toJSON(m));
+ ccc.offerStateUpdate(m);
}
}
@@ -200,7 +199,7 @@ public class DeleteShardCmd implements CollApiCmds.CollectionApiCommand {
ccc.getSolrCloudManager(),
ccc.getZkStateReader());
} else {
- ccc.offerStateUpdate(Utils.toJSON(m));
+ ccc.offerStateUpdate(m);
}
zkStateReader.waitForState(
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/MigrateCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/MigrateCmd.java
index d753a0e17c6..70c45027ae4 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/MigrateCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/MigrateCmd.java
@@ -269,7 +269,7 @@ public class MigrateCmd implements CollApiCmds.CollectionApiCommand {
ccc.getSolrCloudManager(),
ccc.getZkStateReader());
} else {
- ccc.offerStateUpdate(Utils.toJSON(m));
+ ccc.offerStateUpdate(m);
}
// wait for a while until we see the new rule
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/OcmhCollectionCommandContext.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/OcmhCollectionCommandContext.java
index 0733c849261..7677da01b40 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/OcmhCollectionCommandContext.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/OcmhCollectionCommandContext.java
@@ -22,6 +22,7 @@ import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.cloud.DistributedClusterStateUpdater;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.Stats;
+import org.apache.solr.common.MapWriter;
import org.apache.solr.common.SolrCloseable;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.core.CoreContainer;
@@ -71,6 +72,10 @@ public class OcmhCollectionCommandContext implements CollectionCommandContext {
ocmh.overseer.offerStateUpdate(data);
}
+ public void offerStateUpdate(MapWriter mw) throws KeeperException, InterruptedException {
+ ocmh.overseer.offerStateUpdate(mw);
+ }
+
@Override
public SolrCloseable getCloseableToLatchOn() {
return ocmh;
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/ReindexCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/ReindexCollectionCmd.java
index 9a35d9a712e..37dc7eb9b82 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/ReindexCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/ReindexCollectionCmd.java
@@ -409,7 +409,7 @@ public class ReindexCollectionCmd implements CollApiCmds.CollectionApiCommand {
ccc.getSolrCloudManager(),
ccc.getZkStateReader());
} else {
- ccc.offerStateUpdate(Utils.toJSON(cmd));
+ ccc.offerStateUpdate(cmd);
}
TestInjection.injectReindexLatch();
@@ -563,7 +563,7 @@ public class ReindexCollectionCmd implements CollApiCmds.CollectionApiCommand {
ccc.getSolrCloudManager(),
ccc.getZkStateReader());
} else {
- ccc.offerStateUpdate(Utils.toJSON(props));
+ ccc.offerStateUpdate(props);
}
}
// 9. set FINISHED state on the target and clear the state on the source
@@ -583,7 +583,7 @@ public class ReindexCollectionCmd implements CollApiCmds.CollectionApiCommand {
ccc.getSolrCloudManager(),
ccc.getZkStateReader());
} else {
- ccc.offerStateUpdate(Utils.toJSON(props));
+ ccc.offerStateUpdate(props);
}
reindexingState.put(STATE, State.FINISHED.toLower());
@@ -964,7 +964,7 @@ public class ReindexCollectionCmd implements CollApiCmds.CollectionApiCommand {
ccc.getSolrCloudManager(),
ccc.getZkStateReader());
} else {
- ccc.offerStateUpdate(Utils.toJSON(props));
+ ccc.offerStateUpdate(props);
}
removeReindexingState(collection);
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java
index 3a18fab6b37..584a260e00d 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/RestoreCmd.java
@@ -52,6 +52,8 @@ import org.apache.solr.cloud.DistributedClusterStateUpdater;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.api.collections.CollectionHandlingUtils.ShardRequestTracker;
import org.apache.solr.cloud.overseer.OverseerAction;
+import org.apache.solr.common.LinkedHashMapWriter;
+import org.apache.solr.common.MapWriter;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.ClusterState;
@@ -70,7 +72,6 @@ import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.common.util.StrUtils;
-import org.apache.solr.common.util.Utils;
import org.apache.solr.core.ConfigSetService;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.backup.BackupManager;
@@ -400,7 +401,7 @@ public class RestoreCmd implements CollApiCmds.CollectionApiCommand {
private void markAllShardsAsConstruction(DocCollection restoreCollection)
throws KeeperException, InterruptedException {
// TODO might instead createCollection accept an initial state? Is there a race?
- Map<String, Object> propMap = new HashMap<>();
+ LinkedHashMapWriter<Object> propMap = new LinkedHashMapWriter<>();
propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower());
for (Slice shard : restoreCollection.getSlices()) {
propMap.put(shard.getName(), Slice.State.CONSTRUCTION.toString());
@@ -410,11 +411,11 @@ public class RestoreCmd implements CollApiCmds.CollectionApiCommand {
ccc.getDistributedClusterStateUpdater()
.doSingleStateUpdate(
DistributedClusterStateUpdater.MutatingCommand.SliceUpdateShardState,
- new ZkNodeProps(propMap),
+ new ZkNodeProps((MapWriter) propMap),
ccc.getSolrCloudManager(),
ccc.getZkStateReader());
} else {
- ccc.offerStateUpdate(Utils.toJSON(new ZkNodeProps(propMap)));
+ ccc.offerStateUpdate(propMap);
}
}
@@ -558,7 +559,7 @@ public class RestoreCmd implements CollApiCmds.CollectionApiCommand {
// Mark all shards in ACTIVE STATE
private void markAllShardsAsActive(DocCollection restoreCollection)
throws KeeperException, InterruptedException {
- HashMap<String, Object> propMap = new HashMap<>();
+ LinkedHashMapWriter<Object> propMap = new LinkedHashMapWriter<>();
propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower());
propMap.put(ZkStateReader.COLLECTION_PROP, restoreCollection.getName());
for (Slice shard : restoreCollection.getSlices()) {
@@ -568,11 +569,11 @@ public class RestoreCmd implements CollApiCmds.CollectionApiCommand {
ccc.getDistributedClusterStateUpdater()
.doSingleStateUpdate(
DistributedClusterStateUpdater.MutatingCommand.SliceUpdateShardState,
- new ZkNodeProps(propMap),
+ new ZkNodeProps((MapWriter) propMap),
ccc.getSolrCloudManager(),
ccc.getZkStateReader());
} else {
- ccc.offerStateUpdate(Utils.toJSON(new ZkNodeProps(propMap)));
+ ccc.offerStateUpdate(propMap);
}
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
index bdd2946da40..a0bf85600d9 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/SplitShardCmd.java
@@ -49,6 +49,8 @@ import org.apache.solr.cloud.DistributedClusterStateUpdater;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.api.collections.CollectionHandlingUtils.ShardRequestTracker;
import org.apache.solr.cloud.overseer.OverseerAction;
+import org.apache.solr.common.LinkedHashMapWriter;
+import org.apache.solr.common.MapWriter;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.ClusterState;
@@ -389,7 +391,7 @@ public class SplitShardCmd implements CollApiCmds.CollectionApiCommand {
log.debug("Creating slice {} of collection {} on {}", subSlice, collectionName, nodeName);
- Map<String, Object> propMap = new HashMap<>();
+ LinkedHashMapWriter<Object> propMap = new LinkedHashMapWriter<>();
propMap.put(Overseer.QUEUE_OPERATION, CREATESHARD.toLower());
propMap.put(ZkStateReader.SHARD_ID_PROP, subSlice);
propMap.put(ZkStateReader.COLLECTION_PROP, collectionName);
@@ -403,11 +405,11 @@ public class SplitShardCmd implements CollApiCmds.CollectionApiCommand {
ccc.getDistributedClusterStateUpdater()
.doSingleStateUpdate(
DistributedClusterStateUpdater.MutatingCommand.CollectionCreateShard,
- new ZkNodeProps(propMap),
+ new ZkNodeProps((MapWriter) propMap),
ccc.getSolrCloudManager(),
ccc.getZkStateReader());
} else {
- ccc.offerStateUpdate(Utils.toJSON(new ZkNodeProps(propMap)));
+ ccc.offerStateUpdate(propMap);
}
// wait until we are able to see the new shard in cluster state and refresh the local view
@@ -423,7 +425,7 @@ public class SplitShardCmd implements CollApiCmds.CollectionApiCommand {
subSlice,
collectionName,
nodeName);
- propMap = new HashMap<>();
+ propMap = new LinkedHashMapWriter<>();
propMap.put(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower());
propMap.put(COLLECTION_PROP, collectionName);
propMap.put(SHARD_ID_PROP, subSlice);
@@ -443,7 +445,8 @@ public class SplitShardCmd implements CollApiCmds.CollectionApiCommand {
if (asyncId != null) {
propMap.put(ASYNC, asyncId);
}
- new AddReplicaCmd(ccc).addReplica(clusterState, new ZkNodeProps(propMap), results, null);
+ new AddReplicaCmd(ccc)
+ .addReplica(clusterState, new ZkNodeProps((MapWriter) propMap), results, null);
}
{
@@ -642,7 +645,7 @@ public class SplitShardCmd implements CollApiCmds.CollectionApiCommand {
hasRecordedDistributedUpdate = true;
scr.record(DistributedClusterStateUpdater.MutatingCommand.SliceAddReplica, props);
} else {
- ccc.offerStateUpdate(Utils.toJSON(props));
+ ccc.offerStateUpdate(props);
}
HashMap<String, Object> propMap = new HashMap<>();
@@ -708,7 +711,7 @@ public class SplitShardCmd implements CollApiCmds.CollectionApiCommand {
ccc.getSolrCloudManager(),
ccc.getZkStateReader());
} else {
- ccc.offerStateUpdate(Utils.toJSON(m));
+ ccc.offerStateUpdate(m);
}
if (leaderZnodeStat == null) {
@@ -764,7 +767,7 @@ public class SplitShardCmd implements CollApiCmds.CollectionApiCommand {
ccc.getSolrCloudManager(),
ccc.getZkStateReader());
} else {
- ccc.offerStateUpdate(Utils.toJSON(m));
+ ccc.offerStateUpdate(m);
}
} else {
log.debug("Requesting shard state be set to 'recovery' for sub-shards: {}", subSlices);
@@ -783,7 +786,7 @@ public class SplitShardCmd implements CollApiCmds.CollectionApiCommand {
ccc.getSolrCloudManager(),
ccc.getZkStateReader());
} else {
- ccc.offerStateUpdate(Utils.toJSON(m));
+ ccc.offerStateUpdate(m);
}
}
@@ -986,7 +989,7 @@ public class SplitShardCmd implements CollApiCmds.CollectionApiCommand {
ccc.getSolrCloudManager(),
ccc.getZkStateReader());
} else {
- ccc.offerStateUpdate(Utils.toJSON(m));
+ ccc.offerStateUpdate(m);
}
} catch (Exception e) {
// don't give up yet - just log the error, we may still be able to clean up
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
index 3abc393212f..bcb0b1e2ae7 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
@@ -398,7 +398,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
if (coreContainer.getZkController().claimAsyncId(asyncId)) {
boolean success = false;
try {
- coreContainer.getZkController().getOverseerCollectionQueue().offer(Utils.toJSON(m));
+ coreContainer.getZkController().getOverseerCollectionQueue().offer(m);
success = true;
} finally {
if (!success) {
diff --git a/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java b/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java
index f5fb02391c7..3696f4c104b 100644
--- a/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java
@@ -44,7 +44,6 @@ import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.cloud.ZkStateReaderAccessor;
import org.apache.solr.common.util.TimeSource;
-import org.apache.solr.common.util.Utils;
import org.apache.solr.core.ZkContainer;
import org.apache.solr.util.TimeOut;
import org.junit.After;
@@ -281,7 +280,7 @@ public class DeleteReplicaTest extends SolrCloudTestCase {
cluster.getOpenOverseer().getSolrCloudManager(),
cluster.getOpenOverseer().getZkStateReader());
} else {
- cluster.getOpenOverseer().getStateUpdateQueue().offer(Utils.toJSON(m));
+ cluster.getOpenOverseer().getStateUpdateQueue().offer(m);
}
waitForState(
@@ -373,7 +372,7 @@ public class DeleteReplicaTest extends SolrCloudTestCase {
cluster.getOpenOverseer().getSolrCloudManager(),
cluster.getOpenOverseer().getZkStateReader());
} else {
- cluster.getOpenOverseer().getStateUpdateQueue().offer(Utils.toJSON(m));
+ cluster.getOpenOverseer().getStateUpdateQueue().offer(m);
}
boolean replicaDeleted = false;
diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java
index f90e3805405..843c8dd731f 100644
--- a/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java
@@ -572,7 +572,7 @@ public class OverseerCollectionConfigSetProcessorTest extends SolrTestCaseJ4 {
try {
handleCreateCollMessage(invocation.getArgument(0));
verify(stateUpdateQueueMock, Mockito.atLeast(0))
- .offer(invocation.getArgument(0));
+ .offer((byte[]) invocation.getArgument(0));
} catch (KeeperException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
@@ -583,7 +583,7 @@ public class OverseerCollectionConfigSetProcessorTest extends SolrTestCaseJ4 {
}
})
.when(overseerMock)
- .offerStateUpdate(any());
+ .offerStateUpdate((byte[]) any());
}
when(zkControllerMock.getZkClient()).thenReturn(solrZkClientMock);
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkNodeProps.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkNodeProps.java
index 8e5652e3a34..0e6dcef7108 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkNodeProps.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkNodeProps.java
@@ -25,12 +25,13 @@ import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import org.apache.solr.common.MapWriter;
import org.apache.solr.common.util.JavaBinCodec;
import org.apache.solr.common.util.Utils;
import org.noggit.JSONWriter;
/** ZkNodeProps contains generic immutable properties. */
-public class ZkNodeProps implements JSONWriter.Writable {
+public class ZkNodeProps implements JSONWriter.Writable, MapWriter {
protected final Map<String, Object> propMap;
@@ -42,6 +43,10 @@ public class ZkNodeProps implements JSONWriter.Writable {
// Always wrapping introduces a memory leak.
}
+ public ZkNodeProps(MapWriter mw) {
+ propMap = mw.toMap(new HashMap<>());
+ }
+
public ZkNodeProps plus(String key, Object val) {
return plus(Collections.singletonMap(key, val));
}
@@ -143,6 +148,11 @@ public class ZkNodeProps implements JSONWriter.Writable {
return Boolean.parseBoolean(o.toString());
}
+ @Override
+ public void writeMap(EntryWriter ew) throws IOException {
+ propMap.forEach(ew.getBiConsumer());
+ }
+
@Override
public boolean equals(Object that) {
return that instanceof ZkNodeProps && ((ZkNodeProps) that).propMap.equals(this.propMap);