You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by no...@apache.org on 2022/11/11 00:06:43 UTC
[solr] branch branch_9x updated: SOLR-16456 : More Utils.toJson eliminated (#1171)
This is an automated email from the ASF dual-hosted git repository.
noble pushed a commit to branch branch_9x
in repository https://gitbox.apache.org/repos/asf/solr.git
The following commit(s) were added to refs/heads/branch_9x by this push:
new 7a3e5da236e SOLR-16456 : More Utils.toJson eliminated (#1171)
7a3e5da236e is described below
commit 7a3e5da236e541be1e684b4742b183db7158a94b
Author: Noble Paul <no...@users.noreply.github.com>
AuthorDate: Fri Nov 11 11:05:15 2022 +1100
SOLR-16456 : More Utils.toJson eliminated (#1171)
---
.../src/java/org/apache/solr/cloud/Overseer.java | 5 +-
.../java/org/apache/solr/cloud/ZkController.java | 102 ++++++++++-----------
.../org/apache/solr/cloud/ZkDistributedQueue.java | 6 --
.../org/apache/solr/cloud/DeleteShardTest.java | 55 ++++-------
.../apache/solr/cloud/OverseerTaskQueueTest.java | 22 +++--
.../test/org/apache/solr/cloud/OverseerTest.java | 53 ++++++-----
.../solr/cloud/TestRandomRequestDistribution.java | 3 +-
.../org/apache/solr/cloud/ZkControllerTest.java | 37 ++++----
.../solr/client/solrj/cloud/DistributedQueue.java | 9 +-
.../src/java/org/apache/solr/common/MapWriter.java | 8 ++
10 files changed, 144 insertions(+), 156 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index 6736affd526..28a644fb0e3 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -1221,7 +1221,8 @@ public class Overseer implements SolrCloseable {
public void sendQuitToOverseer(String overseerId) throws KeeperException, InterruptedException {
getOverseerQuitNotificationQueue()
.offer(
- new ZkNodeProps(
- Overseer.QUEUE_OPERATION, OverseerAction.QUIT.toLower(), ID, overseerId));
+ ew ->
+ ew.put(Overseer.QUEUE_OPERATION, OverseerAction.QUIT.toLower())
+ .put(ID, overseerId));
}
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index 48ab82b53bb..f59d20f6d04 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -70,6 +70,7 @@ import org.apache.solr.cloud.overseer.ClusterStateMutator;
import org.apache.solr.cloud.overseer.OverseerAction;
import org.apache.solr.cloud.overseer.SliceMutator;
import org.apache.solr.common.AlreadyClosedException;
+import org.apache.solr.common.MapWriter;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.StringUtils;
@@ -1706,35 +1707,42 @@ public class ZkController implements Closeable {
String coreNodeName = cd.getCloudDescriptor().getCoreNodeName();
- Map<String, Object> props = new HashMap<>();
- props.put(Overseer.QUEUE_OPERATION, OverseerAction.STATE.toLower());
- props.put(ZkStateReader.STATE_PROP, state.toString());
- props.put(ZkStateReader.CORE_NAME_PROP, cd.getName());
- props.put(ZkStateReader.ROLES_PROP, cd.getCloudDescriptor().getRoles());
- props.put(ZkStateReader.NODE_NAME_PROP, getNodeName());
- props.put(ZkStateReader.BASE_URL_PROP, zkStateReader.getBaseUrlForNodeName(getNodeName()));
- props.put(ZkStateReader.SHARD_ID_PROP, cd.getCloudDescriptor().getShardId());
- props.put(ZkStateReader.COLLECTION_PROP, collection);
- props.put(ZkStateReader.REPLICA_TYPE, cd.getCloudDescriptor().getReplicaType().toString());
- props.put(ZkStateReader.FORCE_SET_STATE_PROP, "false");
- if (numShards != null) {
- props.put(ZkStateReader.NUM_SHARDS_PROP, numShards.toString());
- }
- if (coreNodeName != null) {
- props.put(ZkStateReader.CORE_NODE_NAME_PROP, coreNodeName);
- }
+ MapWriter m =
+ props -> {
+ props.put(Overseer.QUEUE_OPERATION, OverseerAction.STATE.toLower());
+ props.put(ZkStateReader.STATE_PROP, state.toString());
+ props.put(ZkStateReader.CORE_NAME_PROP, cd.getName());
+ props.put(ZkStateReader.ROLES_PROP, cd.getCloudDescriptor().getRoles());
+ props.put(ZkStateReader.NODE_NAME_PROP, getNodeName());
+ props.put(
+ ZkStateReader.BASE_URL_PROP, zkStateReader.getBaseUrlForNodeName(getNodeName()));
+ props.put(ZkStateReader.SHARD_ID_PROP, cd.getCloudDescriptor().getShardId());
+ props.put(ZkStateReader.COLLECTION_PROP, collection);
+ props.put(
+ ZkStateReader.REPLICA_TYPE, cd.getCloudDescriptor().getReplicaType().toString());
+ props.put(ZkStateReader.FORCE_SET_STATE_PROP, "false");
+ if (numShards != null) {
+ props.put(ZkStateReader.NUM_SHARDS_PROP, numShards.toString());
+ }
+ props.putIfNotNull(ZkStateReader.CORE_NODE_NAME_PROP, coreNodeName);
+ };
+
try (SolrCore core = cc.getCore(cd.getName())) {
if (core != null && state == Replica.State.ACTIVE) {
ensureRegisteredSearcher(core);
}
if (core != null && core.getDirectoryFactory().isSharedStorage()) {
if (core.getDirectoryFactory().isSharedStorage()) {
- props.put(ZkStateReader.SHARED_STORAGE_PROP, "true");
- props.put("dataDir", core.getDataDir());
- UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
- if (ulog != null) {
- props.put("ulogDir", ulog.getLogDir());
- }
+ m =
+ m.append(
+ props -> {
+ props.put(ZkStateReader.SHARED_STORAGE_PROP, "true");
+ props.put("dataDir", core.getDataDir());
+ UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
+ if (ulog != null) {
+ props.put("ulogDir", ulog.getLogDir());
+ }
+ });
}
}
} catch (SolrCoreInitializationException ex) {
@@ -1757,8 +1765,6 @@ public class ZkController implements Closeable {
getShardTerms(collection, shardId).doneRecovering(coreNodeName);
}
- ZkNodeProps m = new ZkNodeProps(props);
-
if (updateLastState) {
cd.getCloudDescriptor().setLastPublished(state);
}
@@ -1767,7 +1773,7 @@ public class ZkController implements Closeable {
if (distributedClusterStateUpdater.isDistributedStateUpdate()) {
distributedClusterStateUpdater.doSingleStateUpdate(
DistributedClusterStateUpdater.MutatingCommand.ReplicaSetState,
- m,
+ new ZkNodeProps(m),
getSolrCloudManager(),
zkStateReader);
} else {
@@ -1868,24 +1874,20 @@ public class ZkController implements Closeable {
PerReplicaStatesOps.deleteReplica(coreNodeName, perReplicaStates)
.persist(docCollection.getZNode(), zkClient);
}
- ZkNodeProps m =
- new ZkNodeProps(
- Overseer.QUEUE_OPERATION,
- OverseerAction.DELETECORE.toLower(),
- ZkStateReader.CORE_NAME_PROP,
- coreName,
- ZkStateReader.NODE_NAME_PROP,
- getNodeName(),
- ZkStateReader.BASE_URL_PROP,
- zkStateReader.getBaseUrlForNodeName(getNodeName()),
- ZkStateReader.COLLECTION_PROP,
- cloudDescriptor.getCollectionName(),
- ZkStateReader.CORE_NODE_NAME_PROP,
- coreNodeName);
+ MapWriter m =
+ ew ->
+ ew.put(Overseer.QUEUE_OPERATION, OverseerAction.DELETECORE.toLower())
+ .put(ZkStateReader.CORE_NAME_PROP, coreName)
+ .put(ZkStateReader.NODE_NAME_PROP, getNodeName())
+ .put(
+ ZkStateReader.BASE_URL_PROP,
+ zkStateReader.getBaseUrlForNodeName(getNodeName()))
+ .put(ZkStateReader.COLLECTION_PROP, cloudDescriptor.getCollectionName())
+ .put(ZkStateReader.CORE_NODE_NAME_PROP, coreNodeName);
if (distributedClusterStateUpdater.isDistributedStateUpdate()) {
distributedClusterStateUpdater.doSingleStateUpdate(
DistributedClusterStateUpdater.MutatingCommand.SliceRemoveReplica,
- m,
+ new ZkNodeProps(m),
getSolrCloudManager(),
zkStateReader);
} else {
@@ -2503,19 +2505,15 @@ public class ZkController implements Closeable {
}
public void setPreferredOverseer() throws KeeperException, InterruptedException {
- ZkNodeProps props =
- new ZkNodeProps(
- Overseer.QUEUE_OPERATION,
- ADDROLE.toString().toLowerCase(Locale.ROOT),
- "node",
- getNodeName(),
- "role",
- "overseer",
- "persist",
- "false");
+ MapWriter props =
+ ew ->
+ ew.put(Overseer.QUEUE_OPERATION, ADDROLE.toString().toLowerCase(Locale.ROOT))
+ .put(getNodeName(), getNodeName())
+ .put("role", "overseer")
+ .put("persist", "false");
log.warn(
"Going to add role {}. It is deprecated to use ADDROLE and consider using Node Roles instead.",
- props);
+ props.jsonStr());
getOverseerCollectionQueue().offer(props);
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java b/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java
index 1332661019b..f2009301a07 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java
@@ -34,14 +34,12 @@ import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Predicate;
import org.apache.solr.client.solrj.cloud.DistributedQueue;
-import org.apache.solr.common.MapWriter;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.ConnectionManager.IsClosed;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkCmdExecutor;
import org.apache.solr.common.util.Pair;
-import org.apache.solr.common.util.Utils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Op;
@@ -309,10 +307,6 @@ public class ZkDistributedQueue implements DistributedQueue {
}
}
- public void offer(MapWriter mw) throws KeeperException, InterruptedException {
- offer(Utils.toJSON(mw));
- }
-
/**
* Inserts data into queue. If there are no other queue consumers, the offered element will be
* immediately visible when this method returns.
diff --git a/solr/core/src/test/org/apache/solr/cloud/DeleteShardTest.java b/solr/core/src/test/org/apache/solr/cloud/DeleteShardTest.java
index b763b58bad6..63bd18ea46d 100644
--- a/solr/core/src/test/org/apache/solr/cloud/DeleteShardTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/DeleteShardTest.java
@@ -17,20 +17,18 @@
package org.apache.solr.cloud;
import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.cloud.DistributedQueue;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.CoreStatus;
import org.apache.solr.cloud.overseer.OverseerAction;
+import org.apache.solr.common.MapWriter;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.Slice.State;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.util.Utils;
import org.apache.solr.util.FileUtils;
import org.junit.After;
import org.junit.Before;
@@ -66,48 +64,39 @@ public class DeleteShardTest extends SolrCloudTestCase {
// Can't delete an ACTIVE shard
expectThrows(
Exception.class,
- () -> {
- CollectionAdminRequest.deleteShard(collection, "shard1").process(cluster.getSolrClient());
- });
+ () ->
+ CollectionAdminRequest.deleteShard(collection, "shard1")
+ .process(cluster.getSolrClient()));
setSliceState(collection, "shard1", Slice.State.INACTIVE);
// Can delete an INACTIVE shard
CollectionAdminRequest.deleteShard(collection, "shard1").process(cluster.getSolrClient());
waitForState(
- "Expected 'shard1' to be removed",
- collection,
- (n, c) -> {
- return c.getSlice("shard1") == null;
- });
+ "Expected 'shard1' to be removed", collection, (n, c) -> c.getSlice("shard1") == null);
// Can delete a shard under construction
setSliceState(collection, "shard2", Slice.State.CONSTRUCTION);
CollectionAdminRequest.deleteShard(collection, "shard2").process(cluster.getSolrClient());
waitForState(
- "Expected 'shard2' to be removed",
- collection,
- (n, c) -> {
- return c.getSlice("shard2") == null;
- });
+ "Expected 'shard2' to be removed", collection, (n, c) -> c.getSlice("shard2") == null);
}
protected void setSliceState(String collection, String slice, State state) throws Exception {
// TODO can this be encapsulated better somewhere?
- Map<String, Object> propMap = new HashMap<>();
- propMap.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower());
- propMap.put(slice, state.toString());
- propMap.put(ZkStateReader.COLLECTION_PROP, collection);
- ZkNodeProps m = new ZkNodeProps(propMap);
-
+ MapWriter m =
+ ew ->
+ ew.put(Overseer.QUEUE_OPERATION, OverseerAction.UPDATESHARDSTATE.toLower())
+ .put(slice, state.toString())
+ .put(ZkStateReader.COLLECTION_PROP, collection);
final Overseer overseer = cluster.getOpenOverseer();
if (overseer.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
overseer
.getDistributedClusterStateUpdater()
.doSingleStateUpdate(
DistributedClusterStateUpdater.MutatingCommand.SliceUpdateShardState,
- m,
+ new ZkNodeProps(m),
cluster.getOpenOverseer().getSolrCloudManager(),
cluster.getOpenOverseer().getZkStateReader());
} else {
@@ -118,15 +107,13 @@ public class DeleteShardTest extends SolrCloudTestCase {
.getZkController()
.getOverseer()
.getStateUpdateQueue();
- inQueue.offer(Utils.toJSON(m));
+ inQueue.offer(m);
}
waitForState(
"Expected shard " + slice + " to be in state " + state,
collection,
- (n, c) -> {
- return c.getSlice(slice).getState() == state;
- });
+ (n, c) -> c.getSlice(slice).getState() == state);
}
@Test
@@ -152,12 +139,7 @@ public class DeleteShardTest extends SolrCloudTestCase {
// Delete shard 'a'
CollectionAdminRequest.deleteShard(collection, "a").process(cluster.getSolrClient());
- waitForState(
- "Expected 'a' to be removed",
- collection,
- (n, c) -> {
- return c.getSlice("a") == null;
- });
+ waitForState("Expected 'a' to be removed", collection, (n, c) -> c.getSlice("a") == null);
assertEquals(2, getCollectionState(collection).getActiveSlices().size());
assertFalse(
@@ -173,12 +155,7 @@ public class DeleteShardTest extends SolrCloudTestCase {
.setDeleteInstanceDir(false)
.process(cluster.getSolrClient());
- waitForState(
- "Expected 'b' to be removed",
- collection,
- (n, c) -> {
- return c.getSlice("b") == null;
- });
+ waitForState("Expected 'b' to be removed", collection, (n, c) -> c.getSlice("b") == null);
assertEquals(1, getCollectionState(collection).getActiveSlices().size());
assertTrue(
diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerTaskQueueTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerTaskQueueTest.java
index 64f1c5d4c29..27039870857 100644
--- a/solr/core/src/test/org/apache/solr/cloud/OverseerTaskQueueTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/OverseerTaskQueueTest.java
@@ -17,10 +17,10 @@
package org.apache.solr.cloud;
import java.nio.charset.StandardCharsets;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.solr.cloud.api.collections.CollectionHandlingUtils;
+import org.apache.solr.common.MapWriter;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CollectionAdminParams;
import org.apache.solr.common.params.CommonAdminParams;
@@ -47,13 +47,14 @@ public class OverseerTaskQueueTest extends DistributedQueueTest {
// Basic ops
// Put an expected Overseer task onto the queue
- final Map<String, Object> props = new HashMap<>();
- props.put(CommonParams.NAME, "coll1");
- props.put(CollectionAdminParams.COLL_CONF, "myconf");
- props.put(CollectionHandlingUtils.NUM_SLICES, 1);
- props.put(ZkStateReader.REPLICATION_FACTOR, 3);
- props.put(CommonAdminParams.ASYNC, requestId);
- tq.offer(Utils.toJSON(props));
+ MapWriter props =
+ ew ->
+ ew.put(CommonParams.NAME, "coll1")
+ .put(CollectionAdminParams.COLL_CONF, "myconf")
+ .put(CollectionHandlingUtils.NUM_SLICES, 1)
+ .put(ZkStateReader.REPLICATION_FACTOR, 3)
+ .put(CommonAdminParams.ASYNC, requestId);
+ tq.offer(props);
assertTrue(
"Task queue should contain task with requestid " + requestId,
@@ -71,8 +72,9 @@ public class OverseerTaskQueueTest extends DistributedQueueTest {
// containsTaskWithRequestId runs while the response is still in the queue.
String watchID = tq.createResponseNode();
String requestId2 = "baz";
- props.put(CommonAdminParams.ASYNC, requestId2);
- tq.createRequestNode(Utils.toJSON(props), watchID);
+
+ tq.createRequestNode(
+ Utils.toJSON(props.append(ew -> ew.put(CommonAdminParams.ASYNC, requestId2))), watchID);
// Set a SolrResponse as the response node by removing the QueueEvent, as done in
// OverseerTaskProcessor
diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
index 039890921c1..83a7a8764a0 100644
--- a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
@@ -72,7 +72,6 @@ import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.IOUtils;
import org.apache.solr.common.util.SolrNamedThreadFactory;
import org.apache.solr.common.util.TimeSource;
-import org.apache.solr.common.util.Utils;
import org.apache.solr.core.CloudConfig;
import org.apache.solr.core.ClusterSingletons;
import org.apache.solr.core.CoreContainer;
@@ -208,7 +207,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
// Look for "new Overseer" calls in this class.
assertFalse(overseer.getDistributedClusterStateUpdater().isDistributedStateUpdate());
ZkDistributedQueue q = overseer.getStateUpdateQueue();
- q.offer(Utils.toJSON(m));
+ q.offer(m);
}
public String publishState(
@@ -242,7 +241,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
collection);
assertFalse(overseer.getDistributedClusterStateUpdater().isDistributedStateUpdate());
ZkDistributedQueue q = overseer.getStateUpdateQueue();
- q.offer(Utils.toJSON(m));
+ q.offer(m);
return null;
} else {
ZkNodeProps m =
@@ -266,7 +265,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
ZkStateReader.NUM_SHARDS_PROP,
Integer.toString(numShards));
ZkDistributedQueue q = overseer.getStateUpdateQueue();
- q.offer(Utils.toJSON(m));
+ q.offer(m);
}
if (startElection && collection.length() > 0) {
@@ -460,7 +459,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
"createNodeSet",
"");
ZkDistributedQueue q = getOverseerZero().getStateUpdateQueue();
- q.offer(Utils.toJSON(m));
+ q.offer(m);
}
@Test
@@ -729,7 +728,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
ZkDistributedQueue q = getOverseerZero().getStateUpdateQueue();
- q.offer(Utils.toJSON(m));
+ q.offer(m);
verifyReplicaStatus(
reader, commands.get(0).name, "shard1", "core_node1", Replica.State.DOWN);
@@ -813,7 +812,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
ZkStateReader.STATE_PROP,
Replica.State.RECOVERING.toString());
- q.offer(Utils.toJSON(m));
+ q.offer(m);
waitForCollections(reader, COLLECTION);
verifyReplicaStatus(reader, "collection1", "shard1", "core_node1", Replica.State.RECOVERING);
@@ -836,7 +835,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
ZkStateReader.STATE_PROP,
Replica.State.ACTIVE.toString());
- q.offer(Utils.toJSON(m));
+ q.offer(m);
verifyReplicaStatus(reader, "collection1", "shard1", "core_node1", Replica.State.ACTIVE);
@@ -1138,11 +1137,11 @@ public class OverseerTest extends SolrTestCaseJ4 {
"createNodeSet",
"");
ZkDistributedQueue workQueue = Overseer.getInternalWorkQueue(zkClient, new Stats());
- workQueue.offer(Utils.toJSON(badMessage));
+ workQueue.offer(badMessage);
overseerClient = electNewOverseer(server.getZkAddress());
ZkDistributedQueue q = getOpenOverseer().getStateUpdateQueue();
- q.offer(Utils.toJSON(badMessage));
+ q.offer(badMessage);
TimeOut timeOut = new TimeOut(10, TimeUnit.SECONDS, TimeSource.NANO_TIME);
while (!timeOut.hasTimedOut()) {
@@ -1226,7 +1225,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
"createNodeSet",
"");
ZkDistributedQueue q = getOpenOverseer().getStateUpdateQueue();
- q.offer(Utils.toJSON(m));
+ q.offer(m);
break;
} catch (SolrException | KeeperException | AlreadyClosedException e) {
log.error("error updating state", e);
@@ -1462,7 +1461,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
ZkStateReader.REPLICATION_FACTOR,
"1");
ZkDistributedQueue q = getOverseerZero().getStateUpdateQueue();
- q.offer(Utils.toJSON(m));
+ q.offer(m);
}
for (int i = 0, j = 0, k = 0; i < MAX_STATE_CHANGES; i++, j++, k++) {
@@ -1485,7 +1484,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
ZkStateReader.NUM_SHARDS_PROP,
"1");
ZkDistributedQueue q = getOverseerZero().getStateUpdateQueue();
- q.offer(Utils.toJSON(m));
+ q.offer(m);
if (j >= MAX_COLLECTIONS - 1) j = 0;
if (k >= MAX_CORES - 1) k = 0;
if (i > 0 && i % 100 == 0) log.info("Published {} items", i);
@@ -1585,7 +1584,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
"1",
"createNodeSet",
"");
- queue.offer(Utils.toJSON(m));
+ queue.offer(m);
m =
new ZkNodeProps(
Overseer.QUEUE_OPERATION,
@@ -1602,7 +1601,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
"",
ZkStateReader.STATE_PROP,
Replica.State.RECOVERING.toString());
- queue.offer(Utils.toJSON(m));
+ queue.offer(m);
m =
new ZkNodeProps(
Overseer.QUEUE_OPERATION,
@@ -1619,7 +1618,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
"",
ZkStateReader.STATE_PROP,
Replica.State.RECOVERING.toString());
- queue.offer(Utils.toJSON(m));
+ queue.offer(m);
overseerClient = electNewOverseer(server.getZkAddress());
@@ -1641,7 +1640,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
"",
ZkStateReader.STATE_PROP,
Replica.State.RECOVERING.toString());
- queue.offer(Utils.toJSON(m));
+ queue.offer(m);
reader.waitForState(
COLLECTION,
@@ -1705,7 +1704,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
ZkStateReader.STATE_PROP,
Replica.State.DOWN.toString());
- q.offer(Utils.toJSON(m));
+ q.offer(m);
waitForCollections(reader, "c1");
verifyReplicaStatus(reader, "c1", "shard1", "core_node1", Replica.State.DOWN);
@@ -1727,7 +1726,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
ZkStateReader.STATE_PROP,
Replica.State.RECOVERING.toString());
- q.offer(Utils.toJSON(m));
+ q.offer(m);
m =
new ZkNodeProps(
@@ -1746,7 +1745,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
ZkStateReader.STATE_PROP,
Replica.State.ACTIVE.toString());
- q.offer(Utils.toJSON(m));
+ q.offer(m);
final String testCollectionName = "test";
zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/" + testCollectionName, true);
@@ -1760,7 +1759,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
"1",
ZkStateReader.REPLICATION_FACTOR,
"1");
- q.offer(Utils.toJSON(m));
+ q.offer(m);
// Wait for the overseer to create state.json for the collection
waitForCollections(reader, testCollectionName);
@@ -1781,7 +1780,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
"x",
ZkStateReader.REPLICATION_FACTOR,
"1");
- q.offer(Utils.toJSON(m));
+ q.offer(m);
m =
new ZkNodeProps(
@@ -1799,7 +1798,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
"core1",
ZkStateReader.STATE_PROP,
Replica.State.DOWN.toString());
- q.offer(Utils.toJSON(m));
+ q.offer(m);
// Verify replica creation worked ok in spite of external update of state.json (although in
// theory such updates do not happen unless an old overseer is still updating ZK after a new
@@ -1977,7 +1976,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
numShards.toString(),
ZkStateReader.REPLICATION_FACTOR,
"1");
- q.offer(Utils.toJSON(m));
+ q.offer(m);
}
waitForCollections(zkStateReader, COLLECTION);
@@ -2004,7 +2003,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
ZkStateReader.STATE_PROP,
Replica.State.RECOVERING.toString());
- q.offer(Utils.toJSON(m));
+ q.offer(m);
}
}
// verify recovering
@@ -2037,7 +2036,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
ZkStateReader.STATE_PROP,
Replica.State.ACTIVE.toString());
- q.offer(Utils.toJSON(m));
+ q.offer(m);
}
}
// verify active
@@ -2062,7 +2061,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
ZkStateReader.CORE_NODE_NAME_PROP,
"core_node" + N);
- q.offer(Utils.toJSON(m));
+ q.offer(m);
{
String shard = "shard" + ss;
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestRandomRequestDistribution.java b/solr/core/src/test/org/apache/solr/cloud/TestRandomRequestDistribution.java
index 949f4b29e94..2d093cd7e16 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestRandomRequestDistribution.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestRandomRequestDistribution.java
@@ -39,7 +39,6 @@ import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.util.Utils;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.SolrCore;
import org.apache.solr.metrics.SolrMetricManager;
@@ -204,7 +203,7 @@ public class TestRandomRequestDistribution extends AbstractFullDistribZkTestBase
overseer.getZkStateReader());
} else {
ZkDistributedQueue q = overseer.getStateUpdateQueue();
- q.offer(Utils.toJSON(m));
+ q.offer(m);
}
verifyReplicaStatus(
diff --git a/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java b/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java
index 0ddf70c360f..004741b1801 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java
@@ -24,12 +24,12 @@ import static org.mockito.Mockito.mock;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.common.MapWriter;
import org.apache.solr.common.cloud.ClusterProperties;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.SolrZkClient;
@@ -285,13 +285,15 @@ public class ZkControllerTest extends SolrTestCaseJ4 {
zkController.getOverseerJobQueue().offer(Utils.toJSON(m));
}
- HashMap<String, Object> propMap = new HashMap<>();
- propMap.put(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower());
- propMap.put(COLLECTION_PROP, collectionName);
- propMap.put(SHARD_ID_PROP, "shard1");
- propMap.put(ZkStateReader.NODE_NAME_PROP, "non_existent_host:1_");
- propMap.put(ZkStateReader.CORE_NAME_PROP, collectionName);
- propMap.put(ZkStateReader.STATE_PROP, "active");
+ MapWriter propMap =
+ ew ->
+ ew.put(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower())
+ .put(COLLECTION_PROP, collectionName)
+ .put(SHARD_ID_PROP, "shard1")
+ .put(ZkStateReader.NODE_NAME_PROP, "non_existent_host:1_")
+ .put(ZkStateReader.CORE_NAME_PROP, collectionName)
+ .put(ZkStateReader.STATE_PROP, "active");
+
if (zkController.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
zkController
.getDistributedClusterStateUpdater()
@@ -301,16 +303,17 @@ public class ZkControllerTest extends SolrTestCaseJ4 {
zkController.getSolrCloudManager(),
zkController.getZkStateReader());
} else {
- zkController.getOverseerJobQueue().offer(Utils.toJSON(propMap));
+ zkController.getOverseerJobQueue().offer(propMap);
}
- propMap = new HashMap<>();
- propMap.put(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower());
- propMap.put(COLLECTION_PROP, collectionName);
- propMap.put(SHARD_ID_PROP, "shard1");
- propMap.put(ZkStateReader.NODE_NAME_PROP, "non_existent_host:2_");
- propMap.put(ZkStateReader.CORE_NAME_PROP, collectionName);
- propMap.put(ZkStateReader.STATE_PROP, "down");
+ propMap =
+ ew ->
+ ew.put(Overseer.QUEUE_OPERATION, ADDREPLICA.toLower())
+ .put(COLLECTION_PROP, collectionName)
+ .put(SHARD_ID_PROP, "shard1")
+ .put(ZkStateReader.NODE_NAME_PROP, "non_existent_host:2_")
+ .put(ZkStateReader.CORE_NAME_PROP, collectionName)
+ .put(ZkStateReader.STATE_PROP, "down");
if (zkController.getDistributedClusterStateUpdater().isDistributedStateUpdate()) {
zkController
.getDistributedClusterStateUpdater()
@@ -320,7 +323,7 @@ public class ZkControllerTest extends SolrTestCaseJ4 {
zkController.getSolrCloudManager(),
zkController.getZkStateReader());
} else {
- zkController.getOverseerJobQueue().offer(Utils.toJSON(propMap));
+ zkController.getOverseerJobQueue().offer(propMap);
}
zkController.getZkStateReader().forciblyRefreshAllClusterStateSlow();
diff --git a/solr/solrj-zookeeper/src/java/org/apache/solr/client/solrj/cloud/DistributedQueue.java b/solr/solrj-zookeeper/src/java/org/apache/solr/client/solrj/cloud/DistributedQueue.java
index 097f013a3c0..478f9d7c938 100644
--- a/solr/solrj-zookeeper/src/java/org/apache/solr/client/solrj/cloud/DistributedQueue.java
+++ b/solr/solrj-zookeeper/src/java/org/apache/solr/client/solrj/cloud/DistributedQueue.java
@@ -19,7 +19,10 @@ package org.apache.solr.client.solrj.cloud;
import java.util.Collection;
import java.util.Map;
import java.util.function.Predicate;
+import org.apache.solr.common.MapWriter;
import org.apache.solr.common.util.Pair;
+import org.apache.solr.common.util.Utils;
+import org.apache.zookeeper.KeeperException;
/** Distributed queue component. Methods largely follow those in {@link java.util.Queue}. */
public interface DistributedQueue {
@@ -35,7 +38,11 @@ public interface DistributedQueue {
byte[] take() throws Exception;
- void offer(byte[] data) throws Exception;
+ void offer(byte[] data) throws KeeperException, InterruptedException;
+
+ default void offer(MapWriter mw) throws KeeperException, InterruptedException {
+ offer(Utils.toJSON(mw));
+ }
/** Retrieve statistics about the queue size, operations and their timings. */
Map<String, Object> getStats();
diff --git a/solr/solrj/src/java/org/apache/solr/common/MapWriter.java b/solr/solrj/src/java/org/apache/solr/common/MapWriter.java
index 72146dd1bae..1fefc98d778 100644
--- a/solr/solrj/src/java/org/apache/solr/common/MapWriter.java
+++ b/solr/solrj/src/java/org/apache/solr/common/MapWriter.java
@@ -43,6 +43,14 @@ public interface MapWriter extends MapSerializable, NavigableObject {
void writeMap(EntryWriter ew) throws IOException;
+ default MapWriter append(MapWriter another) {
+ MapWriter m = this;
+ return ew -> {
+ m.writeMap(ew);
+ another.writeMap(ew);
+ };
+ }
+
/**
* An interface to push one entry at a time to the output. The order of the keys is not defined,
* but we assume they are distinct -- don't call {@code put} more than once for the same key.