You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by so...@apache.org on 2020/06/03 19:13:36 UTC
[lucene-solr] 07/47: SOLR-14486: Autoscaling simulation framework
should stop using /clusterstate.json.
This is an automated email from the ASF dual-hosted git repository.
sokolov pushed a commit to branch jira/lucene-8962
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
commit 51c8e076f0e612acd6645873ca176d4dbb572f4e
Author: Andrzej Bialecki <ab...@apache.org>
AuthorDate: Tue May 19 18:52:47 2020 +0200
SOLR-14486: Autoscaling simulation framework should stop using /clusterstate.json.
---
solr/CHANGES.txt | 3 +
.../cloud/autoscaling/sim/SimCloudManager.java | 1 -
.../autoscaling/sim/SimClusterStateProvider.java | 279 ++++++++++++---------
.../sim/SnapshotClusterStateProvider.java | 32 ++-
.../autoscaling/sim/TestSnapshotCloudManager.java | 10 +-
5 files changed, 207 insertions(+), 118 deletions(-)
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 2f878d0..7105d5c 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -71,6 +71,9 @@ Other Changes
If you have security concerns or other reasons to disable the Admin UI, you can modify `SOLR_ADMIN_UI_DISABLED`
`solr.in.sh`/`solr.in.cmd` at start. (marcussorealheis)
+* SOLR-14486: Autoscaling simulation framework no longer creates /clusterstate.json (format 1),
+ instead it creates individual per-collection /state.json files (format 2). (ab)
+
================== 8.6.0 ==================
Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release.
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/SimCloudManager.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/SimCloudManager.java
index 9b9352b..aa2d7d0 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/SimCloudManager.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/SimCloudManager.java
@@ -181,7 +181,6 @@ public class SimCloudManager implements SolrCloudManager {
if (distribStateManager == null) {
this.stateManager = new SimDistribStateManager(SimDistribStateManager.createNewRootNode());
// init common paths
- stateManager.makePath(ZkStateReader.CLUSTER_STATE);
stateManager.makePath(ZkStateReader.CLUSTER_PROPS);
stateManager.makePath(ZkStateReader.SOLR_AUTOSCALING_CONF_PATH);
stateManager.makePath(ZkStateReader.LIVE_NODES_ZKNODE);
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java
index df14c76..b76f9b5 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java
@@ -48,7 +48,9 @@ import java.util.stream.Collectors;
import com.google.common.util.concurrent.AtomicDouble;
import org.apache.commons.math3.stat.descriptive.SummaryStatistics;
import org.apache.solr.client.solrj.cloud.DistribStateManager;
+import org.apache.solr.client.solrj.cloud.autoscaling.AlreadyExistsException;
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
+import org.apache.solr.client.solrj.cloud.autoscaling.BadVersionException;
import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
@@ -98,6 +100,7 @@ import org.apache.solr.core.SolrInfoBean;
import org.apache.solr.metrics.SolrMetricManager;
import org.apache.solr.update.SolrIndexSplitter;
import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -119,8 +122,8 @@ import static org.apache.solr.common.params.CommonParams.NAME;
* <li>using autoscaling policy for replica placements</li>
* <li>maintaining and up-to-date list of /live_nodes and nodeAdded / nodeLost markers</li>
* <li>running a simulated leader election on collection changes (with throttling), when needed</li>
- * <li>maintaining an up-to-date /clusterstate.json (single file format), which also tracks replica states,
- * leader election changes, replica property changes, etc. Note: this file is only written,
+ * <li>maintaining an up-to-date /state.json per-collection files, which also track replica states,
+ * leader election changes, replica property changes, etc. Note: these files are only written,
* but never read by the framework!</li>
* <li>maintaining an up-to-date /clusterprops.json. Note: this file is only written, but never read by the
* framework!</li>
@@ -153,12 +156,131 @@ public class SimClusterStateProvider implements ClusterStateProvider {
private final Map<String, Map<String, Long>> opDelays = new ConcurrentHashMap<>();
- private volatile int clusterStateVersion = 0;
private volatile String overseerLeader = null;
private volatile Map<String, Object> lastSavedProperties = null;
- private final AtomicReference<Map<String, DocCollection>> collectionsStatesRef = new AtomicReference<>();
+ private class CachedCollectionRef {
+ private final String name;
+ private int zkVersion;
+ private DocCollection coll;
+ ReentrantLock lock = new ReentrantLock();
+
+ CachedCollectionRef(String name, int zkVersion) {
+ this.name = name;
+ this.zkVersion = zkVersion;
+ }
+
+ public DocCollection getColl() throws InterruptedException, IOException {
+ DocCollection dc = coll;
+ if (dc != null) {
+ return dc;
+ }
+ lock.lock();
+ try {
+ if (coll != null) {
+ return coll;
+ } else {
+ Map<String, Map<String, Map<String, Replica>>> collMap = new HashMap<>();
+ nodeReplicaMap.forEach((n, replicas) -> {
+ synchronized (replicas) {
+ replicas.forEach(ri -> {
+ if (!ri.getCollection().equals(name)) {
+ return;
+ }
+ Map<String, Object> props;
+ synchronized (ri) {
+ props = new HashMap<>(ri.getVariables());
+ }
+ props.put(ZkStateReader.NODE_NAME_PROP, n);
+ props.put(ZkStateReader.CORE_NAME_PROP, ri.getCore());
+ props.put(ZkStateReader.REPLICA_TYPE, ri.getType().toString());
+ props.put(ZkStateReader.STATE_PROP, ri.getState().toString());
+ Replica r = new Replica(ri.getName(), props, ri.getCollection(), ri.getShard());
+ collMap.computeIfAbsent(ri.getCollection(), c -> new HashMap<>())
+ .computeIfAbsent(ri.getShard(), s -> new HashMap<>())
+ .put(ri.getName(), r);
+ });
+ }
+ });
+
+ // add empty slices
+ sliceProperties.forEach((c, perSliceProps) -> {
+ if (!c.equals(name)) {
+ return;
+ }
+ perSliceProps.forEach((slice, props) -> {
+ collMap.computeIfAbsent(c, co -> new ConcurrentHashMap<>()).computeIfAbsent(slice, s -> new ConcurrentHashMap<>());
+ });
+ });
+ // add empty collections
+ collProperties.keySet().forEach(c -> {
+ if (!c.equals(name)) {
+ return;
+ }
+ collMap.computeIfAbsent(c, co -> new ConcurrentHashMap<>());
+ });
+
+ Map<String, Map<String, Replica>> shards = collMap.get(name);
+ Map<String, Slice> slices = new HashMap<>();
+ shards.forEach((s, replicas) -> {
+ Map<String, Object> sliceProps = sliceProperties.computeIfAbsent(name, c -> new ConcurrentHashMap<>()).computeIfAbsent(s, sl -> new ConcurrentHashMap<>());
+ Slice slice = new Slice(s, replicas, sliceProps, name);
+ slices.put(s, slice);
+ });
+ Map<String, Object> collProps = collProperties.computeIfAbsent(name, c -> new ConcurrentHashMap<>());
+ Map<String, Object> routerProp = (Map<String, Object>) collProps.getOrDefault(DocCollection.DOC_ROUTER, Collections.singletonMap("name", DocRouter.DEFAULT_NAME));
+ DocRouter router = DocRouter.getDocRouter((String)routerProp.getOrDefault("name", DocRouter.DEFAULT_NAME));
+ String path = ZkStateReader.getCollectionPath(name);
+ coll = new DocCollection(name, slices, collProps, router, zkVersion + 1, path);
+ try {
+ SimDistribStateManager stateManager = cloudManager.getSimDistribStateManager();
+ byte[] data = Utils.toJSON(Collections.singletonMap(name, coll));
+ if (!stateManager.hasData(path)) {
+ try {
+ stateManager.makePath(path, data, CreateMode.PERSISTENT, true);
+ } catch (AlreadyExistsException e) {
+ // try updating
+ stateManager.setData(path, data, zkVersion);
+ }
+ } else {
+ stateManager.setData(path, data, zkVersion);
+ }
+ // verify version
+ VersionedData vd = stateManager.getData(path);
+ assert vd.getVersion() == zkVersion + 1;
+ zkVersion++;
+ } catch (KeeperException | BadVersionException e) {
+ // should never happen?
+ throw new RuntimeException("error saving " + coll, e);
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ return coll;
+ }
+
+ public int getZkVersion() {
+ lock.lock();
+ try {
+ return zkVersion;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public void invalidate() {
+ lock.lock();
+ try {
+ coll = null;
+ } finally {
+ lock.unlock();
+ }
+ }
+ }
+
+ private final Map<String, CachedCollectionRef> collectionsStatesRef = new ConcurrentHashMap<>();
private final Random bulkUpdateRandom = new Random(0);
@@ -207,6 +329,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
sliceProperties.clear();
nodeReplicaMap.clear();
liveNodes.clear();
+ collectionsStatesRef.clear();
for (String nodeId : stateManager.listData(ZkStateReader.LIVE_NODES_ZKNODE)) {
if (stateManager.hasData(ZkStateReader.LIVE_NODES_ZKNODE + "/" + nodeId)) {
stateManager.removeData(ZkStateReader.LIVE_NODES_ZKNODE + "/" + nodeId, -1);
@@ -223,6 +346,8 @@ public class SimClusterStateProvider implements ClusterStateProvider {
createEphemeralLiveNode(nodeId);
}
initialState.forEachCollection(dc -> {
+ // DocCollection will be created later
+ collectionsStatesRef.put(dc.getName(), new CachedCollectionRef(dc.getName(), dc.getZNodeVersion()));
collProperties.computeIfAbsent(dc.getName(), name -> new ConcurrentHashMap<>()).putAll(dc.getProperties());
opDelays.computeIfAbsent(dc.getName(), Utils.NEW_HASHMAP_FUN).putAll(defaultOpDelays);
dc.getSlices().forEach(s -> {
@@ -248,7 +373,6 @@ public class SimClusterStateProvider implements ClusterStateProvider {
});
});
});
- collectionsStatesRef.set(null);
} finally {
lock.unlock();
}
@@ -287,8 +411,6 @@ public class SimClusterStateProvider implements ClusterStateProvider {
return nodes.get(random.nextInt(nodes.size()));
}
- // todo: maybe hook up DistribStateManager /clusterstate.json watchers?
-
private ReplicaInfo getReplicaInfo(Replica r) {
final List<ReplicaInfo> list = nodeReplicaMap.computeIfAbsent
(r.getNodeName(), Utils.NEW_SYNCHRONIZED_ARRAYLIST_FUN);
@@ -331,8 +453,8 @@ public class SimClusterStateProvider implements ClusterStateProvider {
// mark every replica on that node as down
boolean res = liveNodes.remove(nodeId);
setReplicaStates(nodeId, Replica.State.DOWN, collections);
- if (!collections.isEmpty()) {
- collectionsStatesRef.set(null);
+ for (String collection : collections) {
+ collectionsStatesRef.get(collection).invalidate();;
}
// remove ephemeral nodes
stateManager.getRoot().removeEphemeralChildren(nodeId);
@@ -363,7 +485,6 @@ public class SimClusterStateProvider implements ClusterStateProvider {
try {
Set<String> myNodes = new HashSet<>(nodeReplicaMap.keySet());
myNodes.removeAll(liveNodes.get());
- collectionsStatesRef.set(null);
} finally {
lock.unlock();
}
@@ -452,7 +573,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
try {
setReplicaStates(nodeId, Replica.State.ACTIVE, collections);
if (!collections.isEmpty()) {
- collectionsStatesRef.set(null);
+ collections.forEach(c -> collectionsStatesRef.get(c).invalidate());
simRunLeaderElection(collections, true);
return true;
} else {
@@ -604,7 +725,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
cloudManager.getMetricManager().registerGauge(null, registry,
() -> replicaSize, "", true, Type.CORE_IDX.metricsAttribute);
// at this point nuke our cached DocCollection state
- collectionsStatesRef.set(null);
+ collectionsStatesRef.get(replicaInfo.getCollection()).invalidate();
log.trace("-- simAddReplica {}", replicaInfo);
if (runLeaderElection) {
simRunLeaderElection(replicaInfo.getCollection(), replicaInfo.getShard(), true);
@@ -633,7 +754,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
colShardReplicaMap.computeIfAbsent(ri.getCollection(), c -> new ConcurrentHashMap<>())
.computeIfAbsent(ri.getShard(), s -> new ArrayList<>())
.remove(ri);
- collectionsStatesRef.set(null);
+ collectionsStatesRef.get(ri.getCollection()).invalidate();
opDelay(ri.getCollection(), CollectionParams.CollectionAction.DELETEREPLICA.name());
@@ -669,26 +790,6 @@ public class SimClusterStateProvider implements ClusterStateProvider {
}
/**
- * Save clusterstate.json to {@link DistribStateManager}.
- * @return saved state
- */
- private ClusterState saveClusterState(ClusterState state) throws IOException {
- ensureNotClosed();
- byte[] data = Utils.toJSON(state);
- try {
- VersionedData oldData = stateManager.getData(ZkStateReader.CLUSTER_STATE);
- int version = oldData != null ? oldData.getVersion() : 0;
- assert clusterStateVersion == version : "local clusterStateVersion out of sync";
- stateManager.setData(ZkStateReader.CLUSTER_STATE, data, version);
- log.debug("** saved cluster state version {}", version);
- clusterStateVersion++;
- } catch (Exception e) {
- throw new IOException(e);
- }
- return state;
- }
-
- /**
* Delay an operation by a configured amount.
* @param collection collection name
* @param op operation name.
@@ -725,7 +826,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
if (saveClusterState) {
lock.lockInterruptibly();
try {
- collectionsStatesRef.set(null);
+ collections.forEach(c -> collectionsStatesRef.get(c).invalidate());
} finally {
lock.unlock();
}
@@ -865,13 +966,13 @@ public class SimClusterStateProvider implements ClusterStateProvider {
}
if (log.isDebugEnabled()) {
log.debug("-- elected new leader for {} / {} (currentVersion={}): {}", collection,
- s.getName(), clusterStateVersion, ri);
+ s.getName(), col.getZNodeVersion(), ri);
}
stateChanged.set(true);
}
} finally {
if (stateChanged.get() || saveState) {
- collectionsStatesRef.set(null);
+ collectionsStatesRef.get(collection).invalidate();
}
lock.unlock();
}
@@ -889,7 +990,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
}
boolean waitForFinalState = props.getBool(CommonAdminParams.WAIT_FOR_FINAL_STATE, false);
final String collectionName = props.getStr(NAME);
- log.debug("-- simCreateCollection {}, currentVersion={}", collectionName, clusterStateVersion);
+ log.debug("-- simCreateCollection {}", collectionName);
String router = props.getStr("router.name", DocRouter.DEFAULT_NAME);
String policy = props.getStr(Policy.POLICY);
@@ -903,12 +1004,6 @@ public class SimClusterStateProvider implements ClusterStateProvider {
CreateCollectionCmd.checkReplicaTypes(props);
// always force getting fresh state
- lock.lockInterruptibly();
- try {
- collectionsStatesRef.set(null);
- } finally {
- lock.unlock();
- }
final ClusterState clusterState = getClusterState();
String withCollection = props.getStr(CollectionAdminParams.WITH_COLLECTION);
@@ -962,8 +1057,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
CollectionAdminParams.COLOCATED_WITH, collectionName);
cmd = new CollectionMutator(cloudManager).modifyCollection(clusterState,message);
}
- // force recreation of collection states
- collectionsStatesRef.set(null);
+ collectionsStatesRef.put(collectionName, new CachedCollectionRef(collectionName, 0));
} finally {
lock.unlock();
@@ -1043,7 +1137,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
// force recreation of collection states
lock.lockInterruptibly();
try {
- collectionsStatesRef.set(null);
+ collectionsStatesRef.get(collectionName).invalidate();
} finally {
lock.unlock();
}
@@ -1057,7 +1151,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
}
}
results.add("success", "");
- log.debug("-- finished createCollection {}, currentVersion={}", collectionName, clusterStateVersion);
+ log.debug("-- finished createCollection {}", collectionName);
}
/**
@@ -1106,7 +1200,8 @@ public class SimClusterStateProvider implements ClusterStateProvider {
}
}
});
- collectionsStatesRef.set(null);
+ cloudManager.getDistribStateManager().removeRecursively(ZkStateReader.getCollectionPath(collection), true, true);
+ collectionsStatesRef.remove(collection);
results.add("success", "");
} catch (Exception e) {
log.warn("Exception", e);
@@ -1121,7 +1216,13 @@ public class SimClusterStateProvider implements ClusterStateProvider {
public void simDeleteAllCollections() throws Exception {
lock.lockInterruptibly();
try {
- collectionsStatesRef.set(null);
+ collectionsStatesRef.keySet().forEach(name -> {
+ try {
+ cloudManager.getDistribStateManager().removeRecursively(ZkStateReader.getCollectionPath(name), true, true);
+ } catch (Exception e) {
+ log.error("Unable to delete collection state.json");
+ }
+ });
collProperties.clear();
sliceProperties.clear();
@@ -1468,7 +1569,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
}
// invalidate cached state
- collectionsStatesRef.set(null);
+ collectionsStatesRef.get(collectionName).invalidate();
} finally {
SplitShardCmd.unlockForSplit(cloudManager, collectionName, sliceName.get());
lock.unlock();
@@ -1516,7 +1617,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
}
}
});
- collectionsStatesRef.set(null);
+ collectionsStatesRef.get(collectionName).invalidate();
results.add("success", "");
} catch (Exception e) {
results.add("failure", e.toString());
@@ -2004,7 +2105,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
props.clear();
props.putAll(properties);
}
- collectionsStatesRef.set(null);
+ collectionsStatesRef.get(coll).invalidate();
} finally {
lock.unlock();
}
@@ -2025,7 +2126,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
} else {
props.put(key, value);
}
- collectionsStatesRef.set(null);
+ collectionsStatesRef.get(coll).invalidate();
} finally {
lock.unlock();
}
@@ -2046,7 +2147,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
if (properties != null) {
sliceProps.putAll(properties);
}
- collectionsStatesRef.set(null);
+ collectionsStatesRef.get(coll).invalidate();
} finally {
lock.unlock();
}
@@ -2247,7 +2348,6 @@ public class SimClusterStateProvider implements ClusterStateProvider {
lock.lockInterruptibly();
try {
final Map<String, Map<String, Object>> stats = new TreeMap<>();
- collectionsStatesRef.set(null);
ClusterState state = getClusterState();
state.forEachCollection(coll -> {
Map<String, Object> perColl = new LinkedHashMap<>();
@@ -2286,7 +2386,9 @@ public class SimClusterStateProvider implements ClusterStateProvider {
}
continue;
}
- AtomicLong buffered = (AtomicLong)sliceProperties.get(coll.getName()).get(s.getName()).get(BUFFERED_UPDATES);
+ AtomicLong buffered = (AtomicLong)sliceProperties
+ .getOrDefault(coll.getName(), Collections.emptyMap())
+ .getOrDefault(s.getName(), Collections.emptyMap()).get(BUFFERED_UPDATES);
if (buffered != null) {
bufferedDocs += buffered.get();
}
@@ -2389,7 +2491,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
lock.lockInterruptibly();
try {
Map<String, DocCollection> states = getCollectionStates();
- ClusterState state = new ClusterState(clusterStateVersion, liveNodes.get(), states);
+ ClusterState state = new ClusterState(0, liveNodes.get(), states);
return state;
} finally {
lock.unlock();
@@ -2399,65 +2501,18 @@ public class SimClusterStateProvider implements ClusterStateProvider {
}
}
- // this method uses a simple cache in collectionsStatesRef. Operations that modify
- // cluster state should always reset this cache so that the changes become visible
private Map<String, DocCollection> getCollectionStates() throws IOException, InterruptedException {
lock.lockInterruptibly();
try {
- Map<String, DocCollection> collectionStates = collectionsStatesRef.get();
- if (collectionStates != null) {
- return collectionStates;
- }
- collectionsStatesRef.set(null);
- log.debug("** creating new collection states, currentVersion={}", clusterStateVersion);
- Map<String, Map<String, Map<String, Replica>>> collMap = new HashMap<>();
- nodeReplicaMap.forEach((n, replicas) -> {
- synchronized (replicas) {
- replicas.forEach(ri -> {
- Map<String, Object> props;
- synchronized (ri) {
- props = new HashMap<>(ri.getVariables());
- }
- props.put(ZkStateReader.NODE_NAME_PROP, n);
- props.put(ZkStateReader.CORE_NAME_PROP, ri.getCore());
- props.put(ZkStateReader.REPLICA_TYPE, ri.getType().toString());
- props.put(ZkStateReader.STATE_PROP, ri.getState().toString());
- Replica r = new Replica(ri.getName(), props, ri.getCollection(), ri.getShard());
- collMap.computeIfAbsent(ri.getCollection(), c -> new HashMap<>())
- .computeIfAbsent(ri.getShard(), s -> new HashMap<>())
- .put(ri.getName(), r);
- });
- }
- });
-
- // add empty slices
- sliceProperties.forEach((c, perSliceProps) -> {
- perSliceProps.forEach((slice, props) -> {
- collMap.computeIfAbsent(c, co -> new ConcurrentHashMap<>()).computeIfAbsent(slice, s -> new ConcurrentHashMap<>());
- });
- });
- // add empty collections
- collProperties.keySet().forEach(c -> {
- collMap.computeIfAbsent(c, co -> new ConcurrentHashMap<>());
- });
-
- Map<String, DocCollection> res = new HashMap<>();
- collMap.forEach((coll, shards) -> {
- Map<String, Slice> slices = new HashMap<>();
- shards.forEach((s, replicas) -> {
- Map<String, Object> sliceProps = sliceProperties.computeIfAbsent(coll, c -> new ConcurrentHashMap<>()).computeIfAbsent(s, sl -> new ConcurrentHashMap<>());
- Slice slice = new Slice(s, replicas, sliceProps, coll);
- slices.put(s, slice);
- });
- Map<String, Object> collProps = collProperties.computeIfAbsent(coll, c -> new ConcurrentHashMap<>());
- Map<String, Object> routerProp = (Map<String, Object>) collProps.getOrDefault(DocCollection.DOC_ROUTER, Collections.singletonMap("name", DocRouter.DEFAULT_NAME));
- DocRouter router = DocRouter.getDocRouter((String)routerProp.getOrDefault("name", DocRouter.DEFAULT_NAME));
- DocCollection dc = new DocCollection(coll, slices, collProps, router, clusterStateVersion, ZkStateReader.CLUSTER_STATE);
- res.put(coll, dc);
+ Map<String, DocCollection> collectionStates = new HashMap<>();
+ collectionsStatesRef.forEach((name, cached) -> {
+ try {
+ collectionStates.put(name, cached.getColl());
+ } catch (Exception e) {
+ throw new RuntimeException("error building collection " + name + " state", e);
+ }
});
- saveClusterState(new ClusterState(clusterStateVersion, liveNodes.get(), res));
- collectionsStatesRef.set(res);
- return res;
+ return collectionStates;
} finally {
lock.unlock();
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/SnapshotClusterStateProvider.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/SnapshotClusterStateProvider.java
index 3655fb3..351265d 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/SnapshotClusterStateProvider.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/sim/SnapshotClusterStateProvider.java
@@ -21,6 +21,7 @@ import java.io.UnsupportedEncodingException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -55,8 +56,27 @@ public class SnapshotClusterStateProvider implements ClusterStateProvider {
liveNodes = Set.copyOf((Collection<String>)snapshot.getOrDefault("liveNodes", Collections.emptySet()));
clusterProperties = (Map<String, Object>)snapshot.getOrDefault("clusterProperties", Collections.emptyMap());
Map<String, Object> stateMap = new HashMap<>((Map<String, Object>)snapshot.getOrDefault("clusterState", Collections.emptyMap()));
- Number version = (Number)stateMap.remove("version");
- clusterState = ClusterState.load(version != null ? version.intValue() : null, stateMap, liveNodes, ZkStateReader.CLUSTER_STATE);
+ Map<String, DocCollection> collectionStates = new HashMap<>();
+ // back-compat with format = 1
+ Integer stateVersion = Integer.valueOf(String.valueOf(stateMap.getOrDefault("version", 0)));
+ stateMap.remove("version");
+ stateMap.forEach((name, state) -> {
+ Map<String, Object> mutableState = (Map<String, Object>)state;
+ Map<String, Object> collMap = (Map<String, Object>) mutableState.get(name);
+ if (collMap == null) {
+ // snapshot in format 1
+ collMap = mutableState;
+ mutableState = Collections.singletonMap(name, state);
+ }
+ Integer version = Integer.parseInt(String.valueOf(collMap.getOrDefault("zNodeVersion", stateVersion)));
+ String path = String.valueOf(collMap.getOrDefault("zNode", ZkStateReader.getCollectionPath(name)));
+ collMap.remove("zNodeVersion");
+ collMap.remove("zNode");
+ byte[] data = Utils.toJSON(mutableState);
+ ClusterState collState = ClusterState.load(version, data, Collections.emptySet(), path);
+ collectionStates.put(name, collState.getCollection(name));
+ });
+ clusterState = new ClusterState(stateVersion, liveNodes, collectionStates);
}
public Map<String, Object> getSnapshot() {
@@ -67,14 +87,18 @@ public class SnapshotClusterStateProvider implements ClusterStateProvider {
}
Map<String, Object> stateMap = new HashMap<>();
snapshot.put("clusterState", stateMap);
- stateMap.put("version", clusterState.getZNodeVersion());
clusterState.forEachCollection(coll -> {
CharArr out = new CharArr();
JSONWriter writer = new JSONWriter(out, 2);
coll.write(writer);
String json = out.toString();
try {
- stateMap.put(coll.getName(), Utils.fromJSON(json.getBytes("UTF-8")));
+ Map<String, Object> collMap = new LinkedHashMap<>((Map<String, Object>)Utils.fromJSON(json.getBytes("UTF-8")));
+ collMap.put("zNodeVersion", coll.getZNodeVersion());
+ collMap.put("zNode", coll.getZNode());
+ // format compatible with the real /state.json, which uses a mini-ClusterState
+ // consisting of a single collection
+ stateMap.put(coll.getName(), Collections.singletonMap(coll.getName(), collMap));
} catch (UnsupportedEncodingException e) {
throw new RuntimeException("should not happen!", e);
}
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSnapshotCloudManager.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSnapshotCloudManager.java
index 6e7f0ea..876c750 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSnapshotCloudManager.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestSnapshotCloudManager.java
@@ -229,8 +229,8 @@ public class TestSnapshotCloudManager extends SolrCloudTestCase {
Pattern.compile("/autoscaling/triggerState/.*"),
// some triggers may have run after the snapshot was taken
Pattern.compile("/autoscaling/events/.*"),
- // we always use format 1 in SimClusterStateProvider
Pattern.compile("/clusterstate\\.json"),
+ Pattern.compile("/collections/[^/]+?/state.json"),
// depending on the startup sequence leaders may differ
Pattern.compile("/collections/[^/]+?/leader_elect/.*"),
Pattern.compile("/collections/[^/]+?/leaders/.*"),
@@ -255,6 +255,14 @@ public class TestSnapshotCloudManager extends SolrCloudTestCase {
.filter(STATE_FILTER_FUN).collect(Collectors.toList()));
Collections.sort(treeOne);
Collections.sort(treeTwo);
+ if (!treeOne.equals(treeTwo)) {
+ List<String> t1 = new ArrayList<>(treeOne);
+ t1.removeAll(treeTwo);
+ log.warn("Only in tree one: " + t1);
+ List<String> t2 = new ArrayList<>(treeTwo);
+ t2.removeAll(treeOne);
+ log.warn("Only in tree two: " + t2);
+ }
assertEquals(treeOne, treeTwo);
for (String path : treeOne) {
VersionedData vd1 = one.getData(path);