You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by no...@apache.org on 2021/01/17 01:00:51 UTC
[lucene-solr] branch jira/solr_13951 updated: merging with 8x
This is an automated email from the ASF dual-hosted git repository.
noble pushed a commit to branch jira/solr_13951
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/jira/solr_13951 by this push:
new 290ea6a merging with 8x
290ea6a is described below
commit 290ea6a14a74eff9e9d1dd3bca1bf8188d9cf9b3
Author: noblepaul <no...@gmail.com>
AuthorDate: Sun Jan 17 12:00:23 2021 +1100
merging with 8x
---
.../java/org/apache/solr/cloud/ZkController.java | 29 +-
.../OverseerCollectionMessageHandler.java | 1 +
.../apache/solr/cloud/overseer/NodeMutator.java | 6 +-
.../apache/solr/cloud/overseer/ReplicaMutator.java | 8 +-
.../apache/solr/cloud/overseer/SliceMutator.java | 25 +-
.../apache/solr/cloud/overseer/ZkStateWriter.java | 6 +-
.../apache/solr/cloud/overseer/ZkWriteCommand.java | 8 +-
solr/solr-ref-guide/src/collection-management.adoc | 3 +
.../apache/solr/common/cloud/PerReplicaStates.java | 705 +++++++--------------
.../solr/common/cloud/PerReplicaStatesOps.java | 303 +++++++++
.../apache/solr/common/cloud/ZkStateReader.java | 10 +-
.../resources/apispec/collections.Commands.json | 5 +
.../client/solrj/impl/CloudSolrClientTest.java | 20 +-
.../solr/common/cloud/TestPerReplicaStates.java | 200 +++---
14 files changed, 707 insertions(+), 622 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index 28fcf73..ee4da55 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -1589,12 +1589,39 @@ public class ZkController implements Closeable {
if (updateLastState) {
cd.getCloudDescriptor().setLastPublished(state);
}
- overseerJobQueue.offer(Utils.toJSON(m));
+ DocCollection coll = zkStateReader.getCollection(collection);
+ if (forcePublish || sendToOverseer(coll, coreNodeName)) {
+ overseerJobQueue.offer(Utils.toJSON(m));
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("bypassed overseer for message : {}", Utils.toJSONString(m));
+ }
+ PerReplicaStates perReplicaStates = PerReplicaStates.fetch(coll.getZNode(), zkClient, coll.getPerReplicaStates());
+ PerReplicaStatesOps.flipState(coreNodeName, state, perReplicaStates)
+ .persist(coll.getZNode(), zkClient);
+ }
} finally {
MDCLoggingContext.clear();
}
}
+ /**
+ * Whether a message needs to be sent to overseer or not
+ */
+ static boolean sendToOverseer(DocCollection coll, String replicaName) {
+ if (coll == null) return true;
+ Replica r = coll.getReplica(replicaName);
+ if (r == null) return true;
+ Slice shard = coll.getSlice(r.shard);
+ if (shard == null) return true;//very unlikely
+ if (shard.getState() == Slice.State.RECOVERY) return true;
+ if (shard.getParent() != null) return true;
+ for (Slice slice : coll.getSlices()) {
+ if (Objects.equals(shard.getName(), slice.getParent())) return true;
+ }
+ return false;
+ }
+
public ZkShardTerms getShardTerms(String collection, String shardId) {
return getCollectionTerms(collection).getShard(shardId);
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
index 4150842..f56a9f4 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
@@ -142,6 +142,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler,
ZkStateReader.REPLICATION_FACTOR, "1",
ZkStateReader.NRT_REPLICAS, "1",
ZkStateReader.TLOG_REPLICAS, "0",
+ DocCollection.PER_REPLICA_STATE, null,
ZkStateReader.PULL_REPLICAS, "0"));
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java
index 77f0550..e8db2b4 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java
@@ -26,7 +26,7 @@ import java.util.Map.Entry;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
-import org.apache.solr.common.cloud.PerReplicaStates;
+import org.apache.solr.common.cloud.PerReplicaStatesOps;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkNodeProps;
@@ -80,9 +80,9 @@ public class NodeMutator {
}
if (needToUpdateCollection) {
- if(docCollection.isPerReplicaState()) {
+ if (docCollection.isPerReplicaState()) {
zkWriteCommands.add(new ZkWriteCommand(collection, docCollection.copyWithSlices(slicesCopy),
- PerReplicaStates.WriteOps.downReplicas(downedReplicas, docCollection.getPerReplicaStates()), false));
+ PerReplicaStatesOps.downReplicas(downedReplicas, docCollection.getPerReplicaStates()), false));
} else {
zkWriteCommands.add(new ZkWriteCommand(collection, docCollection.copyWithSlices(slicesCopy)));
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java
index 747cf90..ecf06e4 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java
@@ -39,6 +39,7 @@ import org.apache.solr.cloud.api.collections.SplitShardCmd;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.PerReplicaStatesOps;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.PerReplicaStates;
import org.apache.solr.common.cloud.Slice;
@@ -309,9 +310,8 @@ public class ReplicaMutator {
Slice slice = collection != null ? collection.getSlice(sliceName) : null;
Map<String, Object> replicaProps = new LinkedHashMap<>(message.getProperties());
- Replica oldReplica = null;
if (slice != null) {
- oldReplica = slice.getReplica(coreNodeName);
+ Replica oldReplica = slice.getReplica(coreNodeName);
if (oldReplica != null) {
if (oldReplica.containsKey(ZkStateReader.LEADER_PROP)) {
replicaProps.put(ZkStateReader.LEADER_PROP, oldReplica.get(ZkStateReader.LEADER_PROP));
@@ -376,9 +376,9 @@ public class ReplicaMutator {
DocCollection newCollection = CollectionMutator.updateSlice(collectionName, collection, slice);
log.debug("Collection is now: {}", newCollection);
- if(collection != null && collection.isPerReplicaState()) {
+ if (collection != null && collection.isPerReplicaState()) {
PerReplicaStates prs = PerReplicaStates.fetch(collection.getZNode(), zkClient, collection.getPerReplicaStates());
- return new ZkWriteCommand(collectionName, newCollection, PerReplicaStates.WriteOps.flipState(replica.getName(), replica.getState(), prs), persistCollectionState);
+ return new ZkWriteCommand(collectionName, newCollection, PerReplicaStatesOps.flipState(replica.getName(), replica.getState(), prs), persistCollectionState);
} else{
return new ZkWriteCommand(collectionName, newCollection);
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java
index 0d95993..019339e 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java
@@ -31,6 +31,7 @@ import org.apache.solr.cloud.api.collections.Assign;
import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.PerReplicaStatesOps;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.PerReplicaStates;
import org.apache.solr.common.cloud.RoutingRule;
@@ -95,13 +96,14 @@ public class SliceMutator {
ZkStateReader.NODE_NAME_PROP, message.getStr(ZkStateReader.NODE_NAME_PROP),
ZkStateReader.REPLICA_TYPE, message.get(ZkStateReader.REPLICA_TYPE)), coll, slice);
- if(collection.isPerReplicaState()) {
+ if (collection.isPerReplicaState()) {
PerReplicaStates prs = PerReplicaStates.fetch(collection.getZNode(), zkClient, collection.getPerReplicaStates());
return new ZkWriteCommand(coll, updateReplica(collection, sl, replica.getName(), replica),
- PerReplicaStates.WriteOps.addReplica(replica.getName(), replica.getState(), replica.isLeader(), prs), true);
+ PerReplicaStatesOps.addReplica(replica.getName(), replica.getState(), replica.isLeader(), prs), true);
} else {
return new ZkWriteCommand(coll, updateReplica(collection, sl, replica.getName(), replica));
}
+
}
public ZkWriteCommand removeReplica(ClusterState clusterState, ZkNodeProps message) {
@@ -127,7 +129,12 @@ public class SliceMutator {
newSlices.put(slice.getName(), slice);
}
- return new ZkWriteCommand(collection, coll.copyWithSlices(newSlices));
+
+ if (coll.isPerReplicaState()) {
+ return new ZkWriteCommand(collection, coll.copyWithSlices(newSlices), PerReplicaStatesOps.deleteReplica(cnn, coll.getPerReplicaStates()) , true);
+ } else {
+ return new ZkWriteCommand(collection, coll.copyWithSlices(newSlices));
+ }
}
public ZkWriteCommand setShardLeader(ClusterState clusterState, ZkNodeProps message) {
@@ -154,7 +161,7 @@ public class SliceMutator {
if (replica == oldLeader && !coreURL.equals(leaderUrl)) {
replica = new ReplicaMutator(cloudManager).unsetLeader(replica);
} else if (coreURL.equals(leaderUrl)) {
- newLeader= replica = new ReplicaMutator(cloudManager).setLeader(replica);
+ newLeader = replica = new ReplicaMutator(cloudManager).setLeader(replica);
}
newReplicas.put(replica.getName(), replica);
@@ -163,13 +170,13 @@ public class SliceMutator {
Map<String, Object> newSliceProps = slice.shallowCopy();
newSliceProps.put(Slice.REPLICAS, newReplicas);
slice = new Slice(slice.getName(), newReplicas, slice.getProperties(), collectionName);
- if(coll.isPerReplicaState()) {
+ if (coll.isPerReplicaState()) {
PerReplicaStates prs = PerReplicaStates.fetch(coll.getZNode(), zkClient, coll.getPerReplicaStates());
return new ZkWriteCommand(collectionName, CollectionMutator.updateSlice(collectionName, coll, slice),
- PerReplicaStates.WriteOps.flipLeader(
- slice.getReplicaNames(),
- newLeader == null ? null : newLeader.getName(),
- prs), false);
+ PerReplicaStatesOps.flipLeader(
+ slice.getReplicaNames(),
+ newLeader == null ? null : newLeader.getName(),
+ prs), false);
} else {
return new ZkWriteCommand(collectionName, CollectionMutator.updateSlice(collectionName, coll, slice));
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
index 26e100f..1ebccd2 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
@@ -221,11 +221,11 @@ public class ZkStateWriter {
DocCollection c = cmd.collection;
if(cmd.ops != null && cmd.ops.isPreOp()) {
- PerReplicaStates.persist(cmd.ops, path, reader.getZkClient());
+ cmd.ops.persist(path, reader.getZkClient());
clusterState = clusterState.copyWith(name,
cmd.collection.copyWith(PerReplicaStates.fetch(cmd.collection.getZNode(), reader.getZkClient(), null)));
}
- if(!cmd.persistCollState) continue;
+ if (!cmd.persistCollState) continue;
if (c == null) {
// let's clean up the state.json of this collection only, the rest should be clean by delete collection cmd
log.debug("going to delete state.json {}", path);
@@ -247,7 +247,7 @@ public class ZkStateWriter {
}
}
if(cmd.ops != null && !cmd.ops.isPreOp()) {
- PerReplicaStates.persist(cmd.ops, path, reader.getZkClient());
+ cmd.ops.persist(path, reader.getZkClient());
DocCollection currentCollState = clusterState.getCollection(cmd.name);
if ( currentCollState != null) {
clusterState = clusterState.copyWith(name,
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkWriteCommand.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkWriteCommand.java
index 2f71674..39d953c 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkWriteCommand.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkWriteCommand.java
@@ -17,7 +17,7 @@
package org.apache.solr.cloud.overseer;
import org.apache.solr.common.cloud.DocCollection;
-import org.apache.solr.common.cloud.PerReplicaStates;
+import org.apache.solr.common.cloud.PerReplicaStatesOps;
public class ZkWriteCommand {
@@ -27,9 +27,9 @@ public class ZkWriteCommand {
public final boolean noop;
// persist the collection state. If this is false, it means the collection state is not modified
public final boolean persistCollState;
- public final PerReplicaStates.WriteOps ops;
+ public final PerReplicaStatesOps ops;
- public ZkWriteCommand(String name, DocCollection collection, PerReplicaStates.WriteOps replicaOps, boolean persistCollState) {
+ public ZkWriteCommand(String name, DocCollection collection, PerReplicaStatesOps replicaOps, boolean persistCollState) {
boolean isPerReplicaState = collection.isPerReplicaState();
this.name = name;
this.collection = collection;
@@ -43,7 +43,7 @@ public class ZkWriteCommand {
this.noop = false;
persistCollState = true;
this.ops = collection != null && collection.isPerReplicaState() ?
- PerReplicaStates.WriteOps.touchChildren():
+ PerReplicaStatesOps.touchChildren():
null;
}
diff --git a/solr/solr-ref-guide/src/collection-management.adoc b/solr/solr-ref-guide/src/collection-management.adoc
index 14e32b6..e1ee0b4 100644
--- a/solr/solr-ref-guide/src/collection-management.adoc
+++ b/solr/solr-ref-guide/src/collection-management.adoc
@@ -87,6 +87,9 @@ If this parameter is specified, the router will look at the value of the field i
+
Please note that <<realtime-get.adoc#realtime-get,RealTime Get>> or retrieval by document ID would also require the parameter `\_route_` (or `shard.keys`) to avoid a distributed search.
+`perReplicaState`::
+If `true` the states of individual replicas will be maintained as individual child of the `state.json`. default is `false`
+
`property._name_=_value_`::
Set core property _name_ to _value_. See the section <<defining-core-properties.adoc#defining-core-properties,Defining core.properties>> for details on supported properties and values.
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java b/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java
index 3eb2ca2..af4bb43 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java
@@ -36,14 +36,11 @@ import org.apache.solr.common.annotation.JsonProperty;
import org.apache.solr.common.util.ReflectMapWriter;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.common.util.WrappedSimpleMap;
-import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static java.util.Collections.singletonList;
import static org.apache.solr.common.params.CommonParams.NAME;
import static org.apache.solr.common.params.CommonParams.VERSION;
@@ -52,536 +49,264 @@ import static org.apache.solr.common.params.CommonParams.VERSION;
* This is an immutable object. When states are modified, a new instance is constructed
*/
public class PerReplicaStates implements ReflectMapWriter {
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- public static final char SEPARATOR = ':';
-
-
- @JsonProperty
- public final String path;
-
- @JsonProperty
- public final int cversion;
-
- @JsonProperty
- public final SimpleMap<State> states;
-
- public PerReplicaStates(String path, int cversion, List<String> states) {
- this.path = path;
- this.cversion = cversion;
- Map<String, State> tmp = new LinkedHashMap<>();
-
- for (String state : states) {
- State rs = State.parse(state);
- if (rs == null) continue;
- State existing = tmp.get(rs.replica);
- if (existing == null) {
- tmp.put(rs.replica, rs);
- } else {
- tmp.put(rs.replica, rs.insert(existing));
- }
- }
- this.states = new WrappedSimpleMap<>(tmp);
-
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ public static final char SEPARATOR = ':';
+ //no:of times to retry in case of a CAS failure
+ public static final int MAX_RETRIES = 5;
+
+
+ //znode path where thisis loaded from
+ @JsonProperty
+ public final String path;
+
+ // the child version of that znode
+ @JsonProperty
+ public final int cversion;
+
+ //states of individual replicas
+ @JsonProperty
+ public final SimpleMap<State> states;
+
+ /**
+ * Construct with data read from ZK
+ * @param path path from where this is loaded
+ * @param cversion the current child version of the znode
+ * @param states the per-replica states (the list of all child nodes)
+ */
+ public PerReplicaStates(String path, int cversion, List<String> states) {
+ this.path = path;
+ this.cversion = cversion;
+ Map<String, State> tmp = new LinkedHashMap<>();
+
+ for (String state : states) {
+ State rs = State.parse(state);
+ if (rs == null) continue;
+ State existing = tmp.get(rs.replica);
+ if (existing == null) {
+ tmp.put(rs.replica, rs);
+ } else {
+ tmp.put(rs.replica, rs.insert(existing));
+ }
}
+ this.states = new WrappedSimpleMap<>(tmp);
- /**Get the changed replicas
- */
- public static Set<String> findModifiedReplicas(PerReplicaStates old, PerReplicaStates fresh) {
- Set<String> result = new HashSet<>();
- if(fresh == null) {
- old.states.forEachKey(result::add);
- return result;
- }
- old.states.forEachEntry((s, state) -> {
- //the state is modified or missing
- if(!Objects.equals(fresh.get(s) , state)) result.add(s);
- });
- fresh.states.forEachEntry((s, state) -> { if(old.get(s) == null ) result.add(s);
- });
- return result;
- }
+ }
- /**
- * This is a persist operation with retry if a write fails due to stale state
- */
- public static void persist(WriteOps ops, String znode, SolrZkClient zkClient) throws KeeperException, InterruptedException {
- try {
- persist(ops.get(), znode, zkClient);
- } catch (KeeperException.NodeExistsException | KeeperException.NoNodeException e) {
- //state is stale
- log.info("stale state for {} . retrying...", znode);
- List<Op> freshOps = ops.get(PerReplicaStates.fetch(znode, zkClient, null));
- persist(freshOps, znode, zkClient);
- log.info("retried for stale state {}, succeeded", znode);
- }
+ /**Get the changed replicas
+ */
+ public static Set<String> findModifiedReplicas(PerReplicaStates old, PerReplicaStates fresh) {
+ Set<String> result = new HashSet<>();
+ if (fresh == null) {
+ old.states.forEachKey(result::add);
+ return result;
}
-
- /**
- * Persist a set of operations to Zookeeper
- */
- public static void persist(List<Op> operations, String znode, SolrZkClient zkClient) throws KeeperException, InterruptedException {
- if (operations == null || operations.isEmpty()) return;
- log.debug("Per-replica state being persisted for :{}, ops: {}", znode, operations);
-
- List<org.apache.zookeeper.Op> ops = new ArrayList<>(operations.size());
- for (Op op : operations) {
- //the state of the replica is being updated
- String path = znode + "/" + op.state.asString;
- List<ACL> acls = zkClient.getZkACLProvider().getACLsToAdd(path);
- ops.add(op.typ == Op.Type.ADD ?
- org.apache.zookeeper.Op.create(path, null, acls, CreateMode.PERSISTENT) :
- org.apache.zookeeper.Op.delete(path, -1));
- }
- try {
- zkClient.multi(ops, true);
- if (log.isDebugEnabled()) {
- //nocommit
- try {
- Stat stat = zkClient.exists(znode, null, true);
- log.debug("After update, cversion : {}", stat.getCversion());
- } catch (Exception e) {
- }
-
- }
- } catch (KeeperException e) {
- log.error("multi op exception : " + e.getMessage() + zkClient.getChildren(znode, null, true));
- throw e;
- }
-
+ old.states.forEachEntry((s, state) -> {
+ // the state is modified or missing
+ if (!Objects.equals(fresh.get(s) , state)) result.add(s);
+ });
+ fresh.states.forEachEntry((s, state) -> { if (old.get(s) == null ) result.add(s);
+ });
+ return result;
+ }
+
+
+ /**
+ * Fetch the latest {@link PerReplicaStates} . It fetches data after checking the {@link Stat#getCversion()} of state.json.
+ * If this is not modified, the same object is returned
+ */
+ public static PerReplicaStates fetch(String path, SolrZkClient zkClient, PerReplicaStates current) {
+ try {
+ if (current != null) {
+ Stat stat = zkClient.exists(current.path, null, true);
+ if (stat == null) return new PerReplicaStates(path, -1, Collections.emptyList());
+ if (current.cversion == stat.getCversion()) return current;// not modifiedZkStateReaderTest
+ }
+ Stat stat = new Stat();
+ List<String> children = zkClient.getChildren(path, null, stat, true);
+ return new PerReplicaStates(path, stat.getCversion(), Collections.unmodifiableList(children));
+ } catch (KeeperException e) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error fetching per-replica states", e);
+ } catch (InterruptedException e) {
+ SolrZkClient.checkInterrupted(e);
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Thread interrupted when loading per-replica states from " + path, e);
}
+ }
- /**
- * Fetch the latest {@link PerReplicaStates} . It fetches data after checking the {@link Stat#getCversion()} of state.json.
- * If this is not modified, the same object is returned
- */
- public static PerReplicaStates fetch(String path, SolrZkClient zkClient, PerReplicaStates current) {
- try {
- if (current != null) {
- Stat stat = zkClient.exists(current.path, null, true);
- if (stat == null) return new PerReplicaStates(path, -1, Collections.emptyList());
- if (current.cversion == stat.getCversion()) return current;// not modifiedZkStateReaderTest
- }
- Stat stat = new Stat();
- List<String> children = zkClient.getChildren(path, null, stat, true);
- return new PerReplicaStates(path, stat.getCversion(), Collections.unmodifiableList(children));
- } catch (KeeperException e) {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error fetching per-replica states", e);
- } catch (InterruptedException e) {
- SolrZkClient.checkInterrupted(e);
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Thread interrupted when loading per-replica states from " + path, e);
- }
+ public static String getReplicaName(String s) {
+ int idx = s.indexOf(SEPARATOR);
+ if (idx > 0) {
+ return s.substring(0, idx);
}
+ return null;
+ }
+ public State get(String replica) {
+ return states.get(replica);
+ }
- private static List<Op> addDeleteStaleNodes(List<Op> ops, State rs) {
- while (rs != null) {
- ops.add(new Op(Op.Type.DELETE, rs));
- rs = rs.duplicate;
- }
- return ops;
- }
-
- public static String getReplicaName(String s) {
- int idx = s.indexOf(SEPARATOR);
- if (idx > 0) {
- return s.substring(0, idx);
- }
- return null;
- }
+ public static class Operation {
+ public final Type typ;
+ public final State state;
- public State get(String replica) {
- return states.get(replica);
+ public Operation(Type typ, State replicaState) {
+ this.typ = typ;
+ this.state = replicaState;
}
- public static class Op {
- public final Type typ;
- public final State state;
-
- public Op(Type typ, State replicaState) {
- this.typ = typ;
- this.state = replicaState;
- }
-
-
- public enum Type {
- //add a new node
- ADD,
- //delete an existing node
- DELETE
- }
- @Override
- public String toString() {
- return typ.toString() + " : " + state;
- }
+ public enum Type {
+ //add a new node
+ ADD,
+ //delete an existing node
+ DELETE
}
+ @Override
+ public String toString() {
+ return typ.toString() + " : " + state;
+ }
+ }
- /**
- * The state of a replica as stored as a node under /collections/collection-name/state.json/replica-state
- */
- public static class State implements MapWriter {
-
- public final String replica;
-
- public final Replica.State state;
-
- public final Boolean isLeader;
-
- public final int version;
-
- public final String asString;
-
- /**
- * if there are multiple entries for the same replica, e.g: core_node_1:12:A core_node_1:13:D
- * <p>
- * the entry with '13' is the latest and the one with '12' is considered a duplicate
- * <p>
- * These are unlikely, but possible
- */
- final State duplicate;
-
- private State(String serialized, List<String> pieces) {
- this.asString = serialized;
- replica = pieces.get(0);
- version = Integer.parseInt(pieces.get(1));
- String encodedStatus = pieces.get(2);
- this.state = Replica.getState(encodedStatus);
- isLeader = pieces.size() > 3 && "L".equals(pieces.get(3));
- duplicate = null;
- }
-
- public static State parse(String serialized) {
- List<String> pieces = StrUtils.splitSmart(serialized, ':');
- if (pieces.size() < 3) return null;
- return new State(serialized, pieces);
-
- }
-
- public State(String replica, Replica.State state, Boolean isLeader, int version) {
- this(replica, state, isLeader, version, null);
- }
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("{").append(path).append("/[").append(cversion).append("]: [");
+ appendStates(sb);
+ return sb.append("]}").toString();
+ }
- public State(String replica, Replica.State state, Boolean isLeader, int version, State duplicate) {
- this.replica = replica;
- this.state = state == null ? Replica.State.ACTIVE : state;
- this.isLeader = isLeader == null ? Boolean.FALSE : isLeader;
- this.version = version;
- asString = serialize();
- this.duplicate = duplicate;
- }
+ private StringBuilder appendStates(StringBuilder sb) {
+ states.forEachEntry(new BiConsumer<String, State>() {
+ int count = 0;
- @Override
- public void writeMap(EntryWriter ew) throws IOException {
- ew.put(NAME, replica);
- ew.put(VERSION, version);
- ew.put(ZkStateReader.STATE_PROP, state.toString());
- if (isLeader) ew.put(Slice.LEADER, isLeader);
- ew.putIfNotNull("duplicate", duplicate);
- }
+ @Override
+ public void accept(String s, State state) {
+ if (count++ > 0) sb.append(", ");
+ sb.append(state.asString);
+ for (State d : state.getDuplicates()) sb.append(d.asString);
+ }
+ });
+ return sb;
+ }
- private State insert(State duplicate) {
- assert this.replica.equals(duplicate.replica);
- if (this.version >= duplicate.version) {
- if (this.duplicate != null) {
- duplicate = new State(duplicate.replica, duplicate.state, duplicate.isLeader, duplicate.version, this.duplicate);
- }
- return new State(this.replica, this.state, this.isLeader, this.version, duplicate);
- } else {
- return duplicate.insert(this);
- }
- }
+ /**
+ * The state of a replica as stored as a node under /collections/collection-name/state.json/replica-state
+ */
+ public static class State implements MapWriter {
- /**
- * fetch duplicates entries for this replica
- */
- List<State> getDuplicates() {
- if (duplicate == null) return Collections.emptyList();
- List<State> result = new ArrayList<>();
- State current = duplicate;
- while (current != null) {
- result.add(current);
- current = current.duplicate;
- }
- return result;
- }
+ public final String replica;
- private String serialize() {
- StringBuilder sb = new StringBuilder(replica)
- .append(":")
- .append(version)
- .append(":")
- .append(state.shortName);
- if (isLeader) sb.append(":").append("L");
- return sb.toString();
- }
+ public final Replica.State state;
+ public final Boolean isLeader;
- @Override
- public String toString() {
- return asString;
- }
+ public final int version;
- @Override
- public boolean equals(Object o) {
- if (o instanceof State) {
- State that = (State) o;
- return Objects.equals(this.asString, that.asString);
- }
- return false;
- }
+ public final String asString;
- @Override
- public int hashCode() {
- return asString.hashCode();
- }
+ /**
+ * if there are multiple entries for the same replica, e.g: core_node_1:12:A core_node_1:13:D
+ * <p>
+ * the entry with '13' is the latest and the one with '12' is considered a duplicate
+ * <p>
+ * These are unlikely, but possible
+ */
+ final State duplicate;
+
+ private State(String serialized, List<String> pieces) {
+ this.asString = serialized;
+ replica = pieces.get(0);
+ version = Integer.parseInt(pieces.get(1));
+ String encodedStatus = pieces.get(2);
+ this.state = Replica.getState(encodedStatus);
+ isLeader = pieces.size() > 3 && "L".equals(pieces.get(3));
+ duplicate = null;
}
+ public static State parse(String serialized) {
+ List<String> pieces = StrUtils.splitSmart(serialized, ':');
+ if (pieces.size() < 3) return null;
+ return new State(serialized, pieces);
- public static abstract class WriteOps {
- private PerReplicaStates rs;
- List<Op> ops;
- private boolean preOp = true;
-
- /**
- * state of a replica is changed
- *
- * @param newState the new state
- */
- public static WriteOps flipState(String replica, Replica.State newState, PerReplicaStates rs) {
- return new WriteOps() {
- @Override
- protected List<Op> refresh(PerReplicaStates rs) {
- List<Op> ops = new ArrayList<>(2);
- State existing = rs.get(replica);
- if (existing == null) {
- ops.add(new Op(Op.Type.ADD, new State(replica, newState, Boolean.FALSE, 0)));
- } else {
- ops.add(new Op(Op.Type.ADD, new State(replica, newState, existing.isLeader, existing.version + 1)));
- addDeleteStaleNodes(ops, existing);
- }
- if (log.isDebugEnabled()) {
- log.debug("flipState on {}, {} -> {}, ops :{}", rs.path, replica, newState, ops);
- }
- return ops;
- }
- }.init(rs);
- }
-
- public PerReplicaStates getPerReplicaStates() {
- return rs;
- }
-
-
- /**Switch a collection from/to perReplicaState=true
- */
- public static WriteOps modifyCollection(DocCollection coll, boolean enable, PerReplicaStates prs) {
- return new WriteOps() {
- @Override
- List<Op> refresh(PerReplicaStates prs) {
- return enable ? enable(coll) : disable(prs);
- }
-
- List<Op> enable(DocCollection coll) {
- List<Op> result = new ArrayList<>();
- coll.forEachReplica((s, r) -> result.add(new Op(Op.Type.ADD, new State(r.getName(), r.getState(), r.isLeader(), 0))));
- return result;
- }
-
- List<Op> disable(PerReplicaStates prs) {
- List<Op> result = new ArrayList<>();
- prs.states.forEachEntry((s, state) -> result.add(new Op(Op.Type.DELETE, state)));
- return result;
- }
- }.init(prs);
-
- }
-
- /**
- * Flip the leader replica to a new one
- *
- * @param allReplicas allReplicas of the shard
- * @param next next leader
- */
- public static WriteOps flipLeader(Set<String> allReplicas, String next, PerReplicaStates rs) {
- return new WriteOps() {
-
- @Override
- protected List<Op> refresh(PerReplicaStates rs) {
- List<Op> ops = new ArrayList<>(4);
- if(next != null) {
- State st = rs.get(next);
- if (st != null) {
- if (!st.isLeader) {
- ops.add(new Op(Op.Type.ADD, new State(st.replica, Replica.State.ACTIVE, Boolean.TRUE, st.version + 1)));
- ops.add(new Op(Op.Type.DELETE, st));
- }
- //else do not do anything , that node is the leader
- } else {
- //there is no entry for the new leader.
- //create one
- ops.add(new Op(Op.Type.ADD, new State(next, Replica.State.ACTIVE, Boolean.TRUE, 0)));
- }
- }
-
- //now go through all other replicas and unset previous leader
- for (String r : allReplicas) {
- State st = rs.get(r);
- if (st == null) continue;//unlikely
- if (!Objects.equals(r, next)) {
- if(st.isLeader) {
- //some other replica is the leader now. unset
- ops.add(new Op(Op.Type.ADD, new State(st.replica, st.state, Boolean.FALSE, st.version + 1)));
- ops.add(new Op(Op.Type.DELETE, st));
- }
- }
- }
- if (log.isDebugEnabled()) {
- log.debug("flipLeader on:{}, {} -> {}, ops: {}", rs.path, allReplicas, next, ops);
- }
- return ops;
- }
-
- }.init(rs);
- }
-
- /**
- * Delete a replica entry from per-replica states
- *
- * @param replica name of the replica to be deleted
- */
- public static WriteOps deleteReplica(String replica, PerReplicaStates rs) {
- return new WriteOps() {
- @Override
- protected List<Op> refresh(PerReplicaStates rs) {
- List<Op> result;
- if (rs == null) {
- result = Collections.emptyList();
- } else {
- State state = rs.get(replica);
- result = addDeleteStaleNodes(new ArrayList<>(), state);
- }
- return result;
- }
- }.init(rs);
- }
+ }
- public static WriteOps addReplica(String replica, Replica.State state, boolean isLeader, PerReplicaStates rs) {
- return new WriteOps() {
- @Override
- protected List<Op> refresh(PerReplicaStates rs) {
- return singletonList(new Op(Op.Type.ADD,
- new State(replica, state, isLeader, 0)));
- }
- }.init(rs);
- }
+ public State(String replica, Replica.State state, Boolean isLeader, int version) {
+ this(replica, state, isLeader, version, null);
+ }
- /**
- * mark a bunch of replicas as DOWN
- */
- public static WriteOps downReplicas(List<String> replicas, PerReplicaStates rs) {
- return new WriteOps() {
- @Override
- List<Op> refresh(PerReplicaStates rs) {
- List<Op> ops = new ArrayList<>();
- for (String replica : replicas) {
- State r = rs.get(replica);
- if (r != null) {
- if (r.state == Replica.State.DOWN && !r.isLeader) continue;
- ops.add(new Op(Op.Type.ADD, new State(replica, Replica.State.DOWN, Boolean.FALSE, r.version + 1)));
- addDeleteStaleNodes(ops, r);
- } else {
- ops.add(new Op(Op.Type.ADD, new State(replica, Replica.State.DOWN, Boolean.FALSE, 0)));
- }
- }
- if (log.isDebugEnabled()) {
- log.debug("for coll: {} down replicas {}, ops {}", rs, replicas, ops);
- }
- return ops;
- }
- }.init(rs);
- }
+ public State(String replica, Replica.State state, Boolean isLeader, int version, State duplicate) {
+ this.replica = replica;
+ this.state = state == null ? Replica.State.ACTIVE : state;
+ this.isLeader = isLeader == null ? Boolean.FALSE : isLeader;
+ this.version = version;
+ asString = serialize();
+ this.duplicate = duplicate;
+ }
- /**
- * Just creates and deletes a summy entry so that the {@link Stat#getCversion()} of states.json
- * is updated
- */
- public static WriteOps touchChildren() {
- WriteOps result = new WriteOps() {
- @Override
- List<Op> refresh(PerReplicaStates rs) {
- List<Op> ops = new ArrayList<>();
- State st = new State(".dummy." + System.nanoTime(), Replica.State.DOWN, Boolean.FALSE, 0);
- ops.add(new Op(Op.Type.ADD, st));
- ops.add(new Op(Op.Type.DELETE, st));
- if (log.isDebugEnabled()) {
- log.debug("touchChildren {}", ops);
- }
- return ops;
- }
- };
- result.preOp = false;
- result.ops = result.refresh(null);
- return result;
- }
+ @Override
+ public void writeMap(EntryWriter ew) throws IOException {
+ ew.put(NAME, replica);
+ ew.put(VERSION, version);
+ ew.put(ZkStateReader.STATE_PROP, state.toString());
+ if (isLeader) ew.put(Slice.LEADER, isLeader);
+ ew.putIfNotNull("duplicate", duplicate);
+ }
- WriteOps init(PerReplicaStates rs) {
- if (rs == null) return null;
- get(rs);
- return this;
+ private State insert(State duplicate) {
+ assert this.replica.equals(duplicate.replica);
+ if (this.version >= duplicate.version) {
+ if (this.duplicate != null) {
+ duplicate = new State(duplicate.replica, duplicate.state, duplicate.isLeader, duplicate.version, this.duplicate);
}
+ return new State(this.replica, this.state, this.isLeader, this.version, duplicate);
+ } else {
+ return duplicate.insert(this);
+ }
+ }
- public List<Op> get() {
- return ops;
- }
+ /**
+ * fetch duplicates entries for this replica
+ */
+ List<State> getDuplicates() {
+ if (duplicate == null) return Collections.emptyList();
+ List<State> result = new ArrayList<>();
+ State current = duplicate;
+ while (current != null) {
+ result.add(current);
+ current = current.duplicate;
+ }
+ return result;
+ }
- public List<Op> get(PerReplicaStates rs) {
- ops = refresh(rs);
- if (ops == null) ops = Collections.emptyList();
- this.rs = rs;
- return ops;
- }
+ private String serialize() {
+ StringBuilder sb = new StringBuilder(replica)
+ .append(":")
+ .append(version)
+ .append(":")
+ .append(state.shortName);
+ if (isLeader) sb.append(":").append("L");
+ return sb.toString();
+ }
- /**
- * To be executed before collection state.json is persisted
- */
- public boolean isPreOp() {
- return preOp;
- }
- /**
- * if a multi operation fails because the state got modified from behind,
- * refresh the operation and try again
- *
- * @param prs The new state
- */
- abstract List<Op> refresh(PerReplicaStates prs);
-
- @Override
- public String toString() {
- return ops.toString();
- }
+ @Override
+ public String toString() {
+ return asString;
}
@Override
- public String toString() {
- StringBuilder sb = new StringBuilder("{").append(path).append("/[").append(cversion).append("]: [");
- appendStates(sb);
- return sb.append("]}").toString();
+ public boolean equals(Object o) {
+ if (o instanceof State) {
+ State that = (State) o;
+ return Objects.equals(this.asString, that.asString);
+ }
+ return false;
}
- private StringBuilder appendStates(StringBuilder sb) {
- states.forEachEntry(new BiConsumer<String, State>() {
- int count = 0;
- @Override
- public void accept(String s, State state) {
- if(count++ > 0) sb.append(", ");
- sb.append(state.asString);
- for (State d : state.getDuplicates()) sb.append(d.asString);
- }
- });
- return sb;
+ @Override
+ public int hashCode() {
+ return asString.hashCode();
}
+ }
}
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStatesOps.java b/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStatesOps.java
new file mode 100644
index 0000000..9e440f9
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStatesOps.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.common.cloud;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Op;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.util.Collections.singletonList;
+
+/**
+ * This is a helper class that encapsulates various operations performed on the per-replica states
+ * Do not directly manipulate the per replica states as it can become difficult to debug them
+ */
+public class PerReplicaStatesOps {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private PerReplicaStates rs;
+ List<PerReplicaStates.Operation> ops;
+ private boolean preOp = true;
+ final Function<PerReplicaStates, List<PerReplicaStates.Operation>> fun;
+
+ PerReplicaStatesOps(Function<PerReplicaStates, List<PerReplicaStates.Operation>> fun) {
+ this.fun = fun;
+ }
+
+ /**
+ * Persist a set of operations to Zookeeper
+ */
+ private void persist(List<PerReplicaStates.Operation> operations, String znode, SolrZkClient zkClient) throws KeeperException, InterruptedException {
+ if (operations == null || operations.isEmpty()) return;
+ if (log.isDebugEnabled()) {
+ log.debug("Per-replica state being persisted for : '{}', ops: {}", znode, operations);
+ }
+
+ List<Op> ops = new ArrayList<>(operations.size());
+ for (PerReplicaStates.Operation op : operations) {
+ //the state of the replica is being updated
+ String path = znode + "/" + op.state.asString;
+ ops.add(op.typ == PerReplicaStates.Operation.Type.ADD ?
+ Op.create(path, null, zkClient.getZkACLProvider().getACLsToAdd(path), CreateMode.PERSISTENT) :
+ Op.delete(path, -1));
+ }
+ try {
+ zkClient.multi(ops, true);
+ } catch (KeeperException e) {
+ log.error("multi op exception : " + e.getMessage() + zkClient.getChildren(znode, null, true));
+ throw e;
+ }
+
+ }
+
+ /**There is a possibility that a replica may have some leftover entries . delete them too
+ */
+ private static List<PerReplicaStates.Operation> addDeleteStaleNodes(List<PerReplicaStates.Operation> ops, PerReplicaStates.State rs) {
+ while (rs != null) {
+ ops.add(new PerReplicaStates.Operation(PerReplicaStates.Operation.Type.DELETE, rs));
+ rs = rs.duplicate;
+ }
+ return ops;
+ }
+
+ /**
+ * This is a persist operation with retry if a write fails due to stale state
+ */
+ public void persist(String znode, SolrZkClient zkClient) throws KeeperException, InterruptedException {
+ List<PerReplicaStates.Operation> operations = ops;
+ for (int i = 0; i < PerReplicaStates.MAX_RETRIES; i++) {
+ try {
+ persist(operations, znode, zkClient);
+ return;
+ } catch (KeeperException.NodeExistsException | KeeperException.NoNodeException e) {
+ //state is stale
+ if(log.isInfoEnabled()) {
+ log.info("stale state for {} , attempt: {}. retrying...", znode, i);
+ }
+ operations = refresh(PerReplicaStates.fetch(znode, zkClient, null));
+ }
+ }
+ }
+
+ public PerReplicaStates getPerReplicaStates() {
+ return rs;
+ }
+
+ /**
+ * state of a replica is changed
+ *
+ * @param newState the new state
+ */
+ public static PerReplicaStatesOps flipState(String replica, Replica.State newState, PerReplicaStates rs) {
+ return new PerReplicaStatesOps(prs -> {
+ List<PerReplicaStates.Operation> operations = new ArrayList<>(2);
+ PerReplicaStates.State existing = rs.get(replica);
+ if (existing == null) {
+ operations.add(new PerReplicaStates.Operation(PerReplicaStates.Operation.Type.ADD, new PerReplicaStates.State(replica, newState, Boolean.FALSE, 0)));
+ } else {
+ operations.add(new PerReplicaStates.Operation(PerReplicaStates.Operation.Type.ADD, new PerReplicaStates.State(replica, newState, existing.isLeader, existing.version + 1)));
+ addDeleteStaleNodes(operations, existing);
+ }
+ if (log.isDebugEnabled()) {
+ log.debug("flipState on {}, {} -> {}, ops :{}", rs.path, replica, newState, operations);
+ }
+ return operations;
+ }).init(rs);
+ }
+
+ /**
+ * Switch a collection from/to perReplicaState=true
+ */
+ public static PerReplicaStatesOps modifyCollection(DocCollection coll, boolean enable, PerReplicaStates rs) {
+ return new PerReplicaStatesOps(prs -> enable ? enable(coll) : disable(prs)).init(rs);
+
+ }
+
+ private static List<PerReplicaStates.Operation> enable(DocCollection coll) {
+ List<PerReplicaStates.Operation> result = new ArrayList<>();
+ coll.forEachReplica((s, r) -> result.add(new PerReplicaStates.Operation(PerReplicaStates.Operation.Type.ADD, new PerReplicaStates.State(r.getName(), r.getState(), r.isLeader(), 0))));
+ return result;
+ }
+
+ private static List<PerReplicaStates.Operation> disable(PerReplicaStates prs) {
+ List<PerReplicaStates.Operation> result = new ArrayList<>();
+ prs.states.forEachEntry((s, state) -> result.add(new PerReplicaStates.Operation(PerReplicaStates.Operation.Type.DELETE, state)));
+ return result;
+ }
+
+ /**
+ * Flip the leader replica to a new one
+ *
+ * @param allReplicas allReplicas of the shard
+ * @param next next leader
+ */
+ public static PerReplicaStatesOps flipLeader(Set<String> allReplicas, String next, PerReplicaStates rs) {
+ return new PerReplicaStatesOps(prs -> {
+ List<PerReplicaStates.Operation> ops = new ArrayList<>();
+ if (next != null) {
+ PerReplicaStates.State st = rs.get(next);
+ if (st != null) {
+ if (!st.isLeader) {
+ ops.add(new PerReplicaStates.Operation(PerReplicaStates.Operation.Type.ADD, new PerReplicaStates.State(st.replica, Replica.State.ACTIVE, Boolean.TRUE, st.version + 1)));
+ ops.add(new PerReplicaStates.Operation(PerReplicaStates.Operation.Type.DELETE, st));
+ }
+ //else do not do anything , that node is the leader
+ } else {
+ //there is no entry for the new leader.
+ //create one
+ ops.add(new PerReplicaStates.Operation(PerReplicaStates.Operation.Type.ADD, new PerReplicaStates.State(next, Replica.State.ACTIVE, Boolean.TRUE, 0)));
+ }
+ }
+
+ //now go through all other replicas and unset previous leader
+ for (String r : allReplicas) {
+ PerReplicaStates.State st = rs.get(r);
+ if (st == null) continue;//unlikely
+ if (!Objects.equals(r, next)) {
+ if (st.isLeader) {
+ //some other replica is the leader now. unset
+ ops.add(new PerReplicaStates.Operation(PerReplicaStates.Operation.Type.ADD, new PerReplicaStates.State(st.replica, st.state, Boolean.FALSE, st.version + 1)));
+ ops.add(new PerReplicaStates.Operation(PerReplicaStates.Operation.Type.DELETE, st));
+ }
+ }
+ }
+ if (log.isDebugEnabled()) {
+ log.debug("flipLeader on:{}, {} -> {}, ops: {}", rs.path, allReplicas, next, ops);
+ }
+ return ops;
+ }).init(rs);
+ }
+
+ /**
+ * Delete a replica entry from per-replica states
+ *
+ * @param replica name of the replica to be deleted
+ */
+ public static PerReplicaStatesOps deleteReplica(String replica, PerReplicaStates rs) {
+ return new PerReplicaStatesOps(prs -> {
+ List<PerReplicaStates.Operation> result;
+ if (rs == null) {
+ result = Collections.emptyList();
+ } else {
+ PerReplicaStates.State state = rs.get(replica);
+ result = addDeleteStaleNodes(new ArrayList<>(), state);
+ }
+ return result;
+ }).init(rs);
+ }
+
+ public static PerReplicaStatesOps addReplica(String replica, Replica.State state, boolean isLeader, PerReplicaStates rs) {
+ return new PerReplicaStatesOps(perReplicaStates -> singletonList(new PerReplicaStates.Operation(PerReplicaStates.Operation.Type.ADD,
+ new PerReplicaStates.State(replica, state, isLeader, 0)))).init(rs);
+ }
+
+ /**
+ * mark a bunch of replicas as DOWN
+ */
+ public static PerReplicaStatesOps downReplicas(List<String> replicas, PerReplicaStates rs) {
+ return new PerReplicaStatesOps(prs -> {
+ List<PerReplicaStates.Operation> operations = new ArrayList<>();
+ for (String replica : replicas) {
+ PerReplicaStates.State r = rs.get(replica);
+ if (r != null) {
+ if (r.state == Replica.State.DOWN && !r.isLeader) continue;
+ operations.add(new PerReplicaStates.Operation(PerReplicaStates.Operation.Type.ADD, new PerReplicaStates.State(replica, Replica.State.DOWN, Boolean.FALSE, r.version + 1)));
+ addDeleteStaleNodes(operations, r);
+ } else {
+ operations.add(new PerReplicaStates.Operation(PerReplicaStates.Operation.Type.ADD, new PerReplicaStates.State(replica, Replica.State.DOWN, Boolean.FALSE, 0)));
+ }
+ }
+ if (log.isDebugEnabled()) {
+ log.debug("for coll: {} down replicas {}, ops {}", rs, replicas, operations);
+ }
+ return operations;
+ }).init(rs);
+ }
+
+ /**
+ * Just creates and deletes a dummy entry so that the {@link Stat#getCversion()} of states.json
+ * is updated
+ */
+ public static PerReplicaStatesOps touchChildren() {
+ PerReplicaStatesOps result = new PerReplicaStatesOps(prs -> {
+ List<PerReplicaStates.Operation> operations = new ArrayList<>(2);
+ PerReplicaStates.State st = new PerReplicaStates.State(".dummy." + System.nanoTime(), Replica.State.DOWN, Boolean.FALSE, 0);
+ operations.add(new PerReplicaStates.Operation(PerReplicaStates.Operation.Type.ADD, st));
+ operations.add(new PerReplicaStates.Operation(PerReplicaStates.Operation.Type.DELETE, st));
+ if (log.isDebugEnabled()) {
+ log.debug("touchChildren {}", operations);
+ }
+ return operations;
+ });
+ result.preOp = false;
+ result.ops = result.refresh(null);
+ return result;
+ }
+
+ PerReplicaStatesOps init(PerReplicaStates rs) {
+ if (rs == null) return null;
+ get(rs);
+ return this;
+ }
+
+ public List<PerReplicaStates.Operation> get() {
+ return ops;
+ }
+
+ public List<PerReplicaStates.Operation> get(PerReplicaStates rs) {
+ ops = refresh(rs);
+ if (ops == null) ops = Collections.emptyList();
+ this.rs = rs;
+ return ops;
+ }
+
+ /**
+ * To be executed before collection state.json is persisted
+ */
+ public boolean isPreOp() {
+ return preOp;
+ }
+
+ /**
+ * This method should compute the set of ZK operations for a given action
+ * for instance, a state change may result in 2 operations on per-replica states (1 CREATE and 1 DELETE)
+ * if a multi operation fails because the state got modified from behind,
+ * refresh the operation and try again
+ *
+ * @param prs The latest state
+ */
+ List<PerReplicaStates.Operation> refresh(PerReplicaStates prs) {
+ if (fun != null) return fun.apply(prs);
+ return null;
+ }
+
+ @Override
+ public String toString() {
+ return ops.toString();
+ }
+}
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index 118dc05..9960e34 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -672,12 +672,12 @@ public class ZkStateReader implements SolrCloseable {
if (!allowCached || lastUpdateTime < 0 || System.nanoTime() - lastUpdateTime > LAZY_CACHE_TIME) {
boolean shouldFetch = true;
if (cachedDocCollection != null) {
- Stat exists = null;
+ Stat freshStats = null;
try {
- exists = zkClient.exists(getCollectionPath(collName), null, true);
+ freshStats = zkClient.exists(getCollectionPath(collName), null, true);
} catch (Exception e) {
}
- if (exists != null && !cachedDocCollection.isModified(exists.getVersion(), exists.getCversion())) {
+ if (freshStats != null && !cachedDocCollection.isModified(freshStats.getVersion(), freshStats.getCversion())) {
shouldFetch = false;
}
}
@@ -855,14 +855,16 @@ public class ZkStateReader implements SolrCloseable {
* Get shard leader properties, with retry if none exist.
*/
public Replica getLeaderRetry(String collection, String shard, int timeout) throws InterruptedException {
-
+ AtomicReference<DocCollection> coll = new AtomicReference<>();
AtomicReference<Replica> leader = new AtomicReference<>();
try {
waitForState(collection, timeout, TimeUnit.MILLISECONDS, (n, c) -> {
if (c == null)
return false;
+ coll.set(c);
Replica l = getLeader(n, c, shard);
if (l != null) {
+ log.debug("leader found for {}/{} to be {}", collection, shard, l);
leader.set(l);
return true;
}
diff --git a/solr/solrj/src/resources/apispec/collections.Commands.json b/solr/solrj/src/resources/apispec/collections.Commands.json
index c3abc60..747cc8b 100644
--- a/solr/solrj/src/resources/apispec/collections.Commands.json
+++ b/solr/solrj/src/resources/apispec/collections.Commands.json
@@ -90,6 +90,11 @@
"type": "boolean",
"description": "If true then request will complete only when all affected replicas become active.",
"default": false
+ },
+ "perReplicaState": {
+ "type": "boolean",
+ "description": "Use Per replica states",
+ "default": false
}
},
"required": [
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java
index 6d6afc6..f655b21 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java
@@ -47,6 +47,7 @@ import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.request.V2Request;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.client.solrj.response.RequestStatusState;
import org.apache.solr.client.solrj.response.SolrPingResponse;
@@ -83,6 +84,8 @@ import org.junit.rules.ExpectedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.solr.client.solrj.SolrRequest.METHOD.POST;
+
/**
* This test would be faster if we simulated the zk state instead.
@@ -239,7 +242,9 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
@Test
public void testRouting() throws Exception {
- CollectionAdminRequest.createCollection("routing_collection", "conf", 2, 1).process(cluster.getSolrClient());
+ CollectionAdminRequest.createCollection("routing_collection", "conf", 2, 1)
+ .setPerReplicaState(USE_PER_REPLICA_STATE)
+ .process(cluster.getSolrClient());
cluster.waitForActiveCollection("routing_collection", 2, 2);
AbstractUpdateRequest request = new UpdateRequest()
@@ -1055,7 +1060,7 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
CollectionAdminRequest.createCollection("versions_collection", "conf", 2, 1)
.process(cluster.getSolrClient());
- final String testCollection = "perReplicaState_test";
+ String testCollection = "perReplicaState_test";
int liveNodes = cluster.getJettySolrRunners().size();
CollectionAdminRequest.createCollection(testCollection, "conf", 2, 2)
.setPerReplicaState(Boolean.TRUE)
@@ -1068,6 +1073,17 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
c.forEachReplica((s, replica) -> assertNotNull(replica.getReplicaState()));
PerReplicaStates prs = PerReplicaStates.fetch(ZkStateReader.getCollectionPath(testCollection), cluster.getZkClient(), null);
assertEquals(4, prs.states.size());
+ testCollection = "perReplicaState_testv2";
+ new V2Request.Builder("/collections")
+ .withMethod(POST)
+ .withPayload("{create: {name: perReplicaState_testv2, config : conf, numShards : 2, nrtReplicas : 2, perReplicaState : true, maxShardsPerNode : 5}}")
+ .build()
+ .process(cluster.getSolrClient());
+ cluster.waitForActiveCollection(testCollection, 2, 4);
+ c = cluster.getSolrClient().getZkStateReader().getCollection(testCollection);
+ c.forEachReplica((s, replica) -> assertNotNull(replica.getReplicaState()));
+ prs = PerReplicaStates.fetch(ZkStateReader.getCollectionPath(testCollection), cluster.getZkClient(), null);
+ assertEquals(4, prs.states.size());
}
}
diff --git a/solr/solrj/src/test/org/apache/solr/common/cloud/TestPerReplicaStates.java b/solr/solrj/src/test/org/apache/solr/common/cloud/TestPerReplicaStates.java
index 25a095d..b6ea6f7 100644
--- a/solr/solrj/src/test/org/apache/solr/common/cloud/TestPerReplicaStates.java
+++ b/solr/solrj/src/test/org/apache/solr/common/cloud/TestPerReplicaStates.java
@@ -18,8 +18,6 @@
package org.apache.solr.common.cloud;
-import java.util.Collections;
-import java.util.List;
import java.util.Set;
import com.google.common.collect.ImmutableList;
@@ -31,107 +29,105 @@ import org.junit.After;
import org.junit.Before;
public class TestPerReplicaStates extends SolrCloudTestCase {
- @Before
- public void prepareCluster() throws Exception {
- configureCluster(4)
- .configure();
+ @Before
+ public void prepareCluster() throws Exception {
+ configureCluster(4)
+ .configure();
+ }
+
+ @After
+ public void tearDownCluster() throws Exception {
+ shutdownCluster();
+ }
+
+ public void testBasic() {
+ PerReplicaStates.State rs = new PerReplicaStates.State("R1", State.ACTIVE, Boolean.FALSE, 1);
+ assertEquals("R1:1:A", rs.asString);
+
+ rs = new PerReplicaStates.State("R1", State.DOWN, Boolean.TRUE, 1);
+ assertEquals("R1:1:D:L", rs.asString);
+ rs = PerReplicaStates.State.parse (rs.asString);
+ assertEquals(State.DOWN, rs.state);
+
+ }
+
+ public void testEntries() {
+ PerReplicaStates entries = new PerReplicaStates("state.json", 0, ImmutableList.of("R1:2:A", "R1:1:A:L", "R1:0:D", "R2:0:D", "R3:0:A"));
+ assertEquals(2, entries.get("R1").version);
+ entries = new PerReplicaStates("state.json", 0, ImmutableList.of("R1:1:A:L", "R1:2:A", "R2:0:D", "R3:0:A", "R1:0:D"));
+ assertEquals(2, entries.get("R1").version);
+ assertEquals(2, entries.get("R1").getDuplicates().size());
+ Set<String> modified = PerReplicaStates.findModifiedReplicas(entries, new PerReplicaStates("state.json", 0, ImmutableList.of("R1:1:A:L", "R1:2:A", "R2:0:D", "R3:1:A", "R1:0:D")));
+ assertEquals(1, modified.size());
+ assertTrue(modified.contains("R3"));
+ modified = PerReplicaStates.findModifiedReplicas( entries,
+ new PerReplicaStates("state.json", 0, ImmutableList.of("R1:1:A:L", "R1:2:A", "R2:0:D", "R3:1:A", "R1:0:D", "R4:0:A")));
+ assertEquals(2, modified.size());
+ assertTrue(modified.contains("R3"));
+ assertTrue(modified.contains("R4"));
+ modified = PerReplicaStates.findModifiedReplicas( entries,
+ new PerReplicaStates("state.json", 0, ImmutableList.of("R1:1:A:L", "R1:2:A", "R3:1:A", "R1:0:D", "R4:0:A")));
+ assertEquals(3, modified.size());
+ assertTrue(modified.contains("R3"));
+ assertTrue(modified.contains("R4"));
+ assertTrue(modified.contains("R2"));
+
+
+ }
+
+ public void testReplicaStateOperations() throws Exception {
+ String root = "/testReplicaStateOperations";
+ cluster.getZkClient().create(root, null, CreateMode.PERSISTENT, true);
+
+ ImmutableList<String> states = ImmutableList.of("R1:2:A", "R1:1:A:L", "R1:0:D", "R3:0:A", "R4:13:A");
+
+ for (String state : states) {
+ cluster.getZkClient().create(root + "/" + state, null, CreateMode.PERSISTENT, true);
}
- @After
- public void tearDownCluster() throws Exception {
- shutdownCluster();
- }
-
- public void testBasic() {
- PerReplicaStates.State rs = new PerReplicaStates.State("R1", State.ACTIVE, Boolean.FALSE, 1);
- assertEquals("R1:1:A", rs.asString);
-
- rs = new PerReplicaStates.State("R1", State.DOWN, Boolean.TRUE, 1);
- assertEquals("R1:1:D:L", rs.asString);
- rs = PerReplicaStates.State.parse (rs.asString);
- assertEquals(State.DOWN, rs.state);
-
- }
-
- public void testEntries() {
- PerReplicaStates entries = new PerReplicaStates("state.json", 0, ImmutableList.of("R1:2:A", "R1:1:A:L", "R1:0:D", "R2:0:D", "R3:0:A"));
- assertEquals(2, entries.get("R1").version);
- entries = new PerReplicaStates("state.json", 0, ImmutableList.of("R1:1:A:L", "R1:2:A", "R2:0:D", "R3:0:A", "R1:0:D"));
- assertEquals(2, entries.get("R1").version);
- assertEquals(2, entries.get("R1").getDuplicates().size());
- Set<String> modified = PerReplicaStates.findModifiedReplicas(entries, new PerReplicaStates("state.json", 0, ImmutableList.of("R1:1:A:L", "R1:2:A", "R2:0:D", "R3:1:A", "R1:0:D")));
- assertEquals(1, modified.size());
- assertTrue(modified.contains("R3"));
- modified = PerReplicaStates.findModifiedReplicas( entries,
- new PerReplicaStates("state.json", 0, ImmutableList.of("R1:1:A:L", "R1:2:A", "R2:0:D", "R3:1:A", "R1:0:D", "R4:0:A")));
- assertEquals(2, modified.size());
- assertTrue(modified.contains("R3"));
- assertTrue(modified.contains("R4"));
- modified = PerReplicaStates.findModifiedReplicas( entries,
- new PerReplicaStates("state.json", 0, ImmutableList.of("R1:1:A:L", "R1:2:A", "R3:1:A", "R1:0:D", "R4:0:A")));
- assertEquals(3, modified.size());
- assertTrue(modified.contains("R3"));
- assertTrue(modified.contains("R4"));
- assertTrue(modified.contains("R2"));
-
-
- }
-
- public void testReplicaStateOperations() throws Exception {
- String root = "/testReplicaStateOperations";
- cluster.getZkClient().create(root, null, CreateMode.PERSISTENT, true);
-
- ImmutableList<String> states = ImmutableList.of("R1:2:A", "R1:1:A:L", "R1:0:D", "R3:0:A", "R4:13:A");
-
- for (String state : states) {
- cluster.getZkClient().create(root + "/" + state, null, CreateMode.PERSISTENT, true);
- }
-
- ZkStateReader zkStateReader = cluster.getSolrClient().getZkStateReader();
- PerReplicaStates rs = zkStateReader.getReplicaStates(new PerReplicaStates(root, 0, Collections.emptyList()));
- assertEquals(3, rs.states.size());
- assertTrue(rs.cversion >= 5);
-
- List<PerReplicaStates.Op> ops = PerReplicaStates.WriteOps.addReplica("R5",State.ACTIVE, false, rs).get();
-
- assertEquals(1, ops.size());
- assertEquals(PerReplicaStates.Op.Type.ADD ,ops.get(0).typ );
- PerReplicaStates.persist(ops, root,cluster.getZkClient());
- rs = zkStateReader.getReplicaStates(root);
- assertEquals(4, rs.states.size());
- assertTrue(rs.cversion >= 6);
- assertEquals(6, cluster.getZkClient().getChildren(root, null,true).size());
- ops = PerReplicaStates.WriteOps.flipState("R1", State.DOWN , rs).get();
-
- assertEquals(4, ops.size());
- assertEquals(PerReplicaStates.Op.Type.ADD, ops.get(0).typ);
- assertEquals(PerReplicaStates.Op.Type.DELETE, ops.get(1).typ);
- assertEquals(PerReplicaStates.Op.Type.DELETE, ops.get(2).typ);
- assertEquals(PerReplicaStates.Op.Type.DELETE, ops.get(3).typ);
- PerReplicaStates.persist(ops, root,cluster.getZkClient());
- rs = zkStateReader.getReplicaStates(root);
- assertEquals(4, rs.states.size());
- assertEquals(3, rs.states.get("R1").version);
-
- ops = PerReplicaStates.WriteOps.deleteReplica("R5" , rs).get();
- assertEquals(1, ops.size());
- PerReplicaStates.persist(ops, root,cluster.getZkClient());
-
- rs = zkStateReader.getReplicaStates(root);
- assertEquals(3, rs.states.size());
-
- ops = PerReplicaStates.WriteOps.flipLeader(ImmutableSet.of("R4","R3","R1"), "R4",rs).get();
-
- assertEquals(2, ops.size());
- assertEquals(PerReplicaStates.Op.Type.ADD, ops.get(0).typ);
- assertEquals(PerReplicaStates.Op.Type.DELETE, ops.get(1).typ);
- PerReplicaStates.persist(ops, root,cluster.getZkClient());
- rs = zkStateReader.getReplicaStates(root);
- ops = PerReplicaStates.WriteOps.flipLeader(ImmutableSet.of("R4","R3","R1"),"R3",rs).get();
- assertEquals(4, ops.size());
- PerReplicaStates.persist(ops, root,cluster.getZkClient());
- rs = zkStateReader.getReplicaStates(root);
- assertTrue(rs.get("R3").isLeader);
- }
+ ZkStateReader zkStateReader = cluster.getSolrClient().getZkStateReader();
+ PerReplicaStates rs = PerReplicaStates.fetch (root, zkStateReader.getZkClient(),null);
+ assertEquals(3, rs.states.size());
+ assertTrue(rs.cversion >= 5);
+
+ PerReplicaStatesOps ops = PerReplicaStatesOps.addReplica("R5",State.ACTIVE, false, rs);
+ assertEquals(1, ops.get().size());
+ assertEquals(PerReplicaStates.Operation.Type.ADD , ops.ops.get(0).typ );
+ ops.persist(root,cluster.getZkClient());
+ rs = PerReplicaStates.fetch (root, zkStateReader.getZkClient(),null);
+ assertEquals(4, rs.states.size());
+ assertTrue(rs.cversion >= 6);
+ assertEquals(6, cluster.getZkClient().getChildren(root, null,true).size());
+ ops = PerReplicaStatesOps.flipState("R1", State.DOWN , rs);
+
+ assertEquals(4, ops.ops.size());
+ assertEquals(PerReplicaStates.Operation.Type.ADD, ops.ops.get(0).typ);
+ assertEquals(PerReplicaStates.Operation.Type.DELETE, ops.ops.get(1).typ);
+ assertEquals(PerReplicaStates.Operation.Type.DELETE, ops.ops.get(2).typ);
+ assertEquals(PerReplicaStates.Operation.Type.DELETE, ops.ops.get(3).typ);
+ ops.persist(root, cluster.getZkClient());
+ rs = PerReplicaStates.fetch (root, zkStateReader.getZkClient(),null);
+ assertEquals(4, rs.states.size());
+ assertEquals(3, rs.states.get("R1").version);
+
+ ops = PerReplicaStatesOps.deleteReplica("R5" , rs);
+ assertEquals(1, ops.ops.size());
+ ops.persist(root,cluster.getZkClient());
+
+ rs = PerReplicaStates.fetch (root, zkStateReader.getZkClient(),null);
+ assertEquals(3, rs.states.size());
+
+ ops = PerReplicaStatesOps.flipLeader(ImmutableSet.of("R4","R3","R1"), "R4",rs);
+ assertEquals(2, ops.ops.size());
+ assertEquals(PerReplicaStates.Operation.Type.ADD, ops.ops.get(0).typ);
+ assertEquals(PerReplicaStates.Operation.Type.DELETE, ops.ops.get(1).typ);
+ ops.persist(root,cluster.getZkClient());
+ rs = PerReplicaStates.fetch (root, zkStateReader.getZkClient(),null);
+ ops = PerReplicaStatesOps.flipLeader(ImmutableSet.of("R4","R3","R1"),"R3",rs);
+ assertEquals(4, ops.ops.size());
+ ops.persist(root,cluster.getZkClient());
+ rs =PerReplicaStates.fetch (root, zkStateReader.getZkClient(),null);
+ assertTrue(rs.get("R3").isLeader);
+ }
}