You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by no...@apache.org on 2020/12/15 13:14:32 UTC
[lucene-solr] 01/01: solr13951
This is an automated email from the ASF dual-hosted git repository.
noble pushed a commit to branch jira/solr_13951
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
commit 42b9e598f37800f71d4fbad66588137c7d45a129
Author: noblepaul <no...@gmail.com>
AuthorDate: Wed Dec 16 00:14:03 2020 +1100
solr13951
---
.../apache/solr/cloud/overseer/NodeMutator.java | 15 +-
.../apache/solr/cloud/overseer/ReplicaMutator.java | 19 +-
.../apache/solr/cloud/overseer/SliceMutator.java | 37 +-
.../apache/solr/cloud/overseer/ZkStateWriter.java | 80 ++-
.../apache/solr/cloud/overseer/ZkWriteCommand.java | 20 +
.../client/solrj/impl/SolrClientCloudManager.java | 3 +
.../org/apache/solr/common/cloud/ClusterState.java | 71 +++
.../apache/solr/common/cloud/DocCollection.java | 71 ++-
.../apache/solr/common/cloud/PerReplicaStates.java | 587 +++++++++++++++++++++
.../java/org/apache/solr/common/cloud/Replica.java | 66 ++-
.../java/org/apache/solr/common/cloud/Slice.java | 22 +-
.../org/apache/solr/common/cloud/SolrZkClient.java | 12 +
.../apache/solr/common/cloud/ZkStateReader.java | 99 +++-
.../solr/common/cloud/TestPerReplicaStates.java | 137 +++++
14 files changed, 1203 insertions(+), 36 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java
index 3f1971e..77f0550 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/NodeMutator.java
@@ -26,6 +26,7 @@ import java.util.Map.Entry;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.PerReplicaStates;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkNodeProps;
@@ -45,6 +46,8 @@ public class NodeMutator {
Map<String, DocCollection> collections = clusterState.getCollectionsMap();
for (Map.Entry<String, DocCollection> entry : collections.entrySet()) {
+ List<String> downedReplicas = new ArrayList<>();
+
String collection = entry.getKey();
DocCollection docCollection = entry.getValue();
@@ -64,10 +67,11 @@ public class NodeMutator {
if (rNodeName.equals(nodeName)) {
log.debug("Update replica state for {} to {}", replica, Replica.State.DOWN);
Map<String, Object> props = replica.shallowCopy();
- Replica newReplica = new Replica(replica.getName(), replica.node, replica.collection, slice.getName(), replica.core,
- Replica.State.DOWN, replica.type, props);
+ props.put(ZkStateReader.STATE_PROP, Replica.State.DOWN.toString());
+ Replica newReplica = new Replica(replica.getName(), props, collection, slice.getName());
newReplicas.put(replica.getName(), newReplica);
needToUpdateCollection = true;
+ downedReplicas.add(replica.getName());
}
}
@@ -76,7 +80,12 @@ public class NodeMutator {
}
if (needToUpdateCollection) {
- zkWriteCommands.add(new ZkWriteCommand(collection, docCollection.copyWithSlices(slicesCopy)));
+ if(docCollection.isPerReplicaState()) {
+ zkWriteCommands.add(new ZkWriteCommand(collection, docCollection.copyWithSlices(slicesCopy),
+ PerReplicaStates.WriteOps.downReplicas(downedReplicas, docCollection.getPerReplicaStates()), false));
+ } else {
+ zkWriteCommands.add(new ZkWriteCommand(collection, docCollection.copyWithSlices(slicesCopy)));
+ }
}
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java
index f849143..747cf90 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ReplicaMutator.java
@@ -40,7 +40,9 @@ import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.PerReplicaStates;
import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.Utils;
@@ -50,6 +52,7 @@ import org.slf4j.LoggerFactory;
import static org.apache.solr.cloud.overseer.CollectionMutator.checkCollectionKeyExistence;
import static org.apache.solr.cloud.overseer.CollectionMutator.checkKeyExistence;
+import static org.apache.solr.cloud.overseer.SliceMutator.getZkClient;
import static org.apache.solr.common.params.CommonParams.NAME;
public class ReplicaMutator {
@@ -57,10 +60,12 @@ public class ReplicaMutator {
protected final SolrCloudManager cloudManager;
protected final DistribStateManager stateManager;
+ protected SolrZkClient zkClient;
public ReplicaMutator(SolrCloudManager cloudManager) {
this.cloudManager = cloudManager;
this.stateManager = cloudManager.getDistribStateManager();
+ this.zkClient = getZkClient(cloudManager);
}
protected Replica setProperty(Replica replica, String key, String value) {
@@ -260,6 +265,7 @@ public class ReplicaMutator {
log.info("Failed to update state because the replica does not exist, {}", message);
return ZkStateWriter.NO_OP;
}
+ boolean persistCollectionState = collection != null && collection.isPerReplicaState();
if (coreNodeName == null) {
coreNodeName = ClusterStateMutator.getAssignedCoreNodeName(collection,
@@ -271,6 +277,7 @@ public class ReplicaMutator {
log.info("Failed to update state because the replica does not exist, {}", message);
return ZkStateWriter.NO_OP;
}
+ persistCollectionState = true;
// if coreNodeName is null, auto assign one
coreNodeName = Assign.assignCoreNodeName(stateManager, collection);
}
@@ -285,6 +292,7 @@ public class ReplicaMutator {
if (sliceName != null) {
log.debug("shard={} is already registered", sliceName);
}
+ persistCollectionState = true;
}
if (sliceName == null) {
//request new shardId
@@ -295,13 +303,15 @@ public class ReplicaMutator {
}
sliceName = Assign.assignShard(collection, numShards);
log.info("Assigning new node to shard shard={}", sliceName);
+ persistCollectionState = true;
}
Slice slice = collection != null ? collection.getSlice(sliceName) : null;
Map<String, Object> replicaProps = new LinkedHashMap<>(message.getProperties());
+ Replica oldReplica = null;
if (slice != null) {
- Replica oldReplica = slice.getReplica(coreNodeName);
+ oldReplica = slice.getReplica(coreNodeName);
if (oldReplica != null) {
if (oldReplica.containsKey(ZkStateReader.LEADER_PROP)) {
replicaProps.put(ZkStateReader.LEADER_PROP, oldReplica.get(ZkStateReader.LEADER_PROP));
@@ -366,7 +376,12 @@ public class ReplicaMutator {
DocCollection newCollection = CollectionMutator.updateSlice(collectionName, collection, slice);
log.debug("Collection is now: {}", newCollection);
- return new ZkWriteCommand(collectionName, newCollection);
+ if(collection != null && collection.isPerReplicaState()) {
+ PerReplicaStates prs = PerReplicaStates.fetch(collection.getZNode(), zkClient, collection.getPerReplicaStates());
+ return new ZkWriteCommand(collectionName, newCollection, PerReplicaStates.WriteOps.flipState(replica.getName(), replica.getState(), prs), persistCollectionState);
+ } else{
+ return new ZkWriteCommand(collectionName, newCollection);
+ }
}
private DocCollection checkAndCompleteShardSplit(ClusterState prevState, DocCollection collection, String coreNodeName, String sliceName, Replica replica) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java
index 40ab1a3..0d95993 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/SliceMutator.java
@@ -25,14 +25,17 @@ import java.util.Set;
import com.google.common.collect.ImmutableSet;
import org.apache.solr.client.solrj.cloud.DistribStateManager;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
+import org.apache.solr.client.solrj.impl.SolrClientCloudManager;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.api.collections.Assign;
import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.PerReplicaStates;
import org.apache.solr.common.cloud.RoutingRule;
import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
@@ -51,10 +54,21 @@ public class SliceMutator {
protected final SolrCloudManager cloudManager;
protected final DistribStateManager stateManager;
+ protected final SolrZkClient zkClient;
public SliceMutator(SolrCloudManager cloudManager) {
this.cloudManager = cloudManager;
this.stateManager = cloudManager.getDistribStateManager();
+ this.zkClient = getZkClient(cloudManager);
+ }
+
+ static SolrZkClient getZkClient(SolrCloudManager cloudManager) {
+ if (cloudManager instanceof SolrClientCloudManager) {
+ SolrClientCloudManager manager = (SolrClientCloudManager) cloudManager;
+ return manager.getZkClient();
+ } else {
+ return null;
+ }
}
public ZkWriteCommand addReplica(ClusterState clusterState, ZkNodeProps message) {
@@ -80,7 +94,14 @@ public class SliceMutator {
ZkStateReader.STATE_PROP, message.getStr(ZkStateReader.STATE_PROP),
ZkStateReader.NODE_NAME_PROP, message.getStr(ZkStateReader.NODE_NAME_PROP),
ZkStateReader.REPLICA_TYPE, message.get(ZkStateReader.REPLICA_TYPE)), coll, slice);
- return new ZkWriteCommand(coll, updateReplica(collection, sl, replica.getName(), replica));
+
+ if(collection.isPerReplicaState()) {
+ PerReplicaStates prs = PerReplicaStates.fetch(collection.getZNode(), zkClient, collection.getPerReplicaStates());
+ return new ZkWriteCommand(coll, updateReplica(collection, sl, replica.getName(), replica),
+ PerReplicaStates.WriteOps.addReplica(replica.getName(), replica.getState(), replica.isLeader(), prs), true);
+ } else {
+ return new ZkWriteCommand(coll, updateReplica(collection, sl, replica.getName(), replica));
+ }
}
public ZkWriteCommand removeReplica(ClusterState clusterState, ZkNodeProps message) {
@@ -124,6 +145,7 @@ public class SliceMutator {
Slice slice = slices.get(sliceName);
Replica oldLeader = slice.getLeader();
+ Replica newLeader = null;
final Map<String, Replica> newReplicas = new LinkedHashMap<>();
for (Replica replica : slice.getReplicas()) {
// TODO: this should only be calculated once and cached somewhere?
@@ -132,7 +154,7 @@ public class SliceMutator {
if (replica == oldLeader && !coreURL.equals(leaderUrl)) {
replica = new ReplicaMutator(cloudManager).unsetLeader(replica);
} else if (coreURL.equals(leaderUrl)) {
- replica = new ReplicaMutator(cloudManager).setLeader(replica);
+ newLeader= replica = new ReplicaMutator(cloudManager).setLeader(replica);
}
newReplicas.put(replica.getName(), replica);
@@ -141,7 +163,16 @@ public class SliceMutator {
Map<String, Object> newSliceProps = slice.shallowCopy();
newSliceProps.put(Slice.REPLICAS, newReplicas);
slice = new Slice(slice.getName(), newReplicas, slice.getProperties(), collectionName);
- return new ZkWriteCommand(collectionName, CollectionMutator.updateSlice(collectionName, coll, slice));
+ if(coll.isPerReplicaState()) {
+ PerReplicaStates prs = PerReplicaStates.fetch(coll.getZNode(), zkClient, coll.getPerReplicaStates());
+ return new ZkWriteCommand(collectionName, CollectionMutator.updateSlice(collectionName, coll, slice),
+ PerReplicaStates.WriteOps.flipLeader(
+ slice.getReplicaNames(),
+ newLeader == null ? null : newLeader.getName(),
+ prs), false);
+ } else {
+ return new ZkWriteCommand(collectionName, CollectionMutator.updateSlice(collectionName, coll, slice));
+ }
}
public ZkWriteCommand updateShardState(ClusterState clusterState, ZkNodeProps message) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
index 155fbc2..26e100f 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
@@ -27,6 +27,7 @@ import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.Stats;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.PerReplicaStates;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.Utils;
import org.apache.zookeeper.CreateMode;
@@ -63,9 +64,10 @@ public class ZkStateWriter {
protected final ZkStateReader reader;
protected final Stats stats;
- protected Map<String, DocCollection> updates = new HashMap<>();
+ protected Map<String, ZkWriteCommand> updates = new HashMap<>();
private int numUpdates = 0;
protected ClusterState clusterState = null;
+ protected boolean isClusterStateModified = false;
protected long lastUpdatedTime = 0;
/**
@@ -111,15 +113,49 @@ public class ZkStateWriter {
if (cmds.isEmpty()) return prevState;
if (isNoOps(cmds)) return prevState;
+ boolean forceFlush = false;
+ if(cmds.size() == 1) {
+ //most messages result in only one command. let's deal with it right away
+ ZkWriteCommand cmd = cmds.get(0);
+ if (cmd.collection != null && cmd.collection.isPerReplicaState()) {
+ //we do not wish to batch any updates for collections with per-replica state because
+ // these changes go to individual ZK nodes and there is zero advantage to batching
+ //now check if there are any updates for the same collection already present
+ if (updates.containsKey(cmd.name)) {
+ //this should not happen
+ // but let's get those updates out anyway
+ writeUpdate(updates.remove(cmd.name));
+ }
+ //now let's write the current message
+ try {
+ return writeUpdate(cmd);
+ } finally {
+ if(callback !=null) callback.onWrite();
+ }
+ }
+ } else {
+ //there are more than one commands created as a result of this message
+ for (ZkWriteCommand cmd : cmds) {
+ if(cmd.collection != null && cmd.collection.isPerReplicaState()) {
+ // we don't try to optimize for this case. let's flush out all after this
+ forceFlush = true;
+ break;
+ }
+ }
+ }
+
+
for (ZkWriteCommand cmd : cmds) {
if (cmd == NO_OP) continue;
prevState = prevState.copyWith(cmd.name, cmd.collection);
- updates.put(cmd.name, cmd.collection);
- numUpdates++;
+ if (cmd.collection == null) {
+ updates.put(cmd.name, cmd);
+ numUpdates++;
+ }
}
clusterState = prevState;
- if (maybeFlushAfter()) {
+ if (forceFlush || maybeFlushAfter()) {
ClusterState state = writePendingUpdates();
if (callback != null) {
callback.onWrite();
@@ -147,9 +183,17 @@ public class ZkStateWriter {
}
public boolean hasPendingUpdates() {
- return numUpdates != 0;
+ return numUpdates != 0 || isClusterStateModified;
+ }
+ public ClusterState writeUpdate(ZkWriteCommand command) throws IllegalStateException, KeeperException, InterruptedException {
+ Map<String, ZkWriteCommand> commands = new HashMap<>();
+ commands.put(command.name, command);
+ return writePendingUpdates(commands);
}
+ public ClusterState writePendingUpdates() throws KeeperException, InterruptedException {
+ return writePendingUpdates(updates);
+ }
/**
* Writes all pending updates to ZooKeeper and returns the modified cluster state
*
@@ -158,20 +202,30 @@ public class ZkStateWriter {
* @throws KeeperException if any ZooKeeper operation results in an error
* @throws InterruptedException if the current thread is interrupted
*/
- public ClusterState writePendingUpdates() throws IllegalStateException, KeeperException, InterruptedException {
+ public ClusterState writePendingUpdates(Map<String, ZkWriteCommand> updates) throws IllegalStateException, KeeperException, InterruptedException {
if (invalidState) {
throw new IllegalStateException("ZkStateWriter has seen a tragic error, this instance can no longer be used");
}
- if (!hasPendingUpdates()) return clusterState;
+ if ((updates == this.updates)
+ && !hasPendingUpdates()) {
+ return clusterState;
+ }
Timer.Context timerContext = stats.time("update_state");
boolean success = false;
try {
if (!updates.isEmpty()) {
- for (Map.Entry<String, DocCollection> entry : updates.entrySet()) {
+ for (Map.Entry<String, ZkWriteCommand> entry : updates.entrySet()) {
String name = entry.getKey();
String path = ZkStateReader.getCollectionPath(name);
- DocCollection c = entry.getValue();
+ ZkWriteCommand cmd = entry.getValue();
+ DocCollection c = cmd.collection;
+ if(cmd.ops != null && cmd.ops.isPreOp()) {
+ PerReplicaStates.persist(cmd.ops, path, reader.getZkClient());
+ clusterState = clusterState.copyWith(name,
+ cmd.collection.copyWith(PerReplicaStates.fetch(cmd.collection.getZNode(), reader.getZkClient(), null)));
+ }
+ if(!cmd.persistCollState) continue;
if (c == null) {
// let's clean up the state.json of this collection only, the rest should be clean by delete collection cmd
log.debug("going to delete state.json {}", path);
@@ -192,6 +246,14 @@ public class ZkStateWriter {
clusterState = clusterState.copyWith(name, newCollection);
}
}
+ if(cmd.ops != null && !cmd.ops.isPreOp()) {
+ PerReplicaStates.persist(cmd.ops, path, reader.getZkClient());
+ DocCollection currentCollState = clusterState.getCollection(cmd.name);
+ if ( currentCollState != null) {
+ clusterState = clusterState.copyWith(name,
+ currentCollState.copyWith(PerReplicaStates.fetch(currentCollState.getZNode(), reader.getZkClient(), null)));
+ }
+ }
}
updates.clear();
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkWriteCommand.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkWriteCommand.java
index d464863..2f71674 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkWriteCommand.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkWriteCommand.java
@@ -17,16 +17,34 @@
package org.apache.solr.cloud.overseer;
import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.PerReplicaStates;
public class ZkWriteCommand {
+
public final String name;
public final DocCollection collection;
+
public final boolean noop;
+ // persist the collection state. If this is false, it means the collection state is not modified
+ public final boolean persistCollState;
+ public final PerReplicaStates.WriteOps ops;
+ public ZkWriteCommand(String name, DocCollection collection, PerReplicaStates.WriteOps replicaOps, boolean persistCollState) {
+ boolean isPerReplicaState = collection.isPerReplicaState();
+ this.name = name;
+ this.collection = collection;
+ this.noop = false;
+ this.ops = isPerReplicaState ? replicaOps : null;
+ this.persistCollState = isPerReplicaState ? persistCollState : true;
+ }
public ZkWriteCommand(String name, DocCollection collection) {
this.name = name;
this.collection = collection;
this.noop = false;
+ persistCollState = true;
+ this.ops = collection != null && collection.isPerReplicaState() ?
+ PerReplicaStates.WriteOps.touchChildren():
+ null;
}
/**
@@ -36,6 +54,8 @@ public class ZkWriteCommand {
this.noop = true;
this.name = null;
this.collection = null;
+ this.ops = null;
+ persistCollState = true;
}
public static ZkWriteCommand noop() {
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientCloudManager.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientCloudManager.java
index 5ad7ff4..7724b58 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientCloudManager.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/SolrClientCloudManager.java
@@ -187,6 +187,9 @@ public class SolrClientCloudManager implements SolrCloudManager {
return EMPTY;
}
}
+ public SolrZkClient getZkClient() {
+ return zkClient;
+ }
@Override
public DistributedQueueFactory getDistributedQueueFactory() {
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
index ebed0ff..5f8ee6b 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
@@ -16,6 +16,7 @@
*/
package org.apache.solr.common.cloud;
+import java.lang.invoke.MethodHandles;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -23,15 +24,19 @@ import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
+import java.util.function.Supplier;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.util.Utils;
import org.apache.zookeeper.KeeperException;
import org.noggit.JSONWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Immutable state of the cloud. Normally you can get the state by using
@@ -39,6 +44,8 @@ import org.noggit.JSONWriter;
* @lucene.experimental
*/
public class ClusterState implements JSONWriter.Writable {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
private final Map<String, CollectionRef> collectionStates, immutableCollectionStates;
private Set<String> liveNodes;
@@ -241,6 +248,12 @@ public class ClusterState implements JSONWriter.Writable {
Map<String,Object> props;
Map<String,Slice> slices;
+ if("true".equals(String.valueOf(objs.get(DocCollection.PER_REPLICA_STATE)))) {
+ log.info("a collection {} has per-replica state" , name);
+ //this collection has replica states stored outside
+ ReplicaStatesProvider rsp = REPLICASTATES_PROVIDER.get();
+ if (rsp instanceof StatesProvider) ((StatesProvider) rsp).isPerReplicaState = true;
+ }
@SuppressWarnings({"unchecked"})
Map<String, Object> sliceObjs = (Map<String, Object>) objs.get(DocCollection.SHARDS);
if (sliceObjs == null) {
@@ -383,5 +396,63 @@ public class ClusterState implements JSONWriter.Writable {
public int size() {
return collectionStates.size();
}
+ interface ReplicaStatesProvider {
+
+ Optional<ReplicaStatesProvider> get();
+
+ PerReplicaStates getStates();
+
+ }
+
+ private static final ReplicaStatesProvider EMPTYSTATEPROVIDER = new ReplicaStatesProvider() {
+ @Override
+ public Optional<ReplicaStatesProvider> get() {
+ return Optional.empty();
+ }
+
+ @Override
+ public PerReplicaStates getStates() {
+ throw new RuntimeException("Invalid operation");
+ }
+
+ };
+
+ private static ThreadLocal<ReplicaStatesProvider> REPLICASTATES_PROVIDER = new ThreadLocal<>();
+
+
+ public static ReplicaStatesProvider getReplicaStatesProvider() {
+ return (REPLICASTATES_PROVIDER.get() == null)? EMPTYSTATEPROVIDER: REPLICASTATES_PROVIDER.get() ;
+ }
+ public static void initReplicaStateProvider(Supplier<PerReplicaStates> replicaStatesSupplier) {
+ REPLICASTATES_PROVIDER.set(new StatesProvider(replicaStatesSupplier));
+ }
+
+
+ public static void clearReplicaStateProvider(){
+ REPLICASTATES_PROVIDER.remove();
+ }
+
+ private static class StatesProvider implements ReplicaStatesProvider {
+ private final Supplier<PerReplicaStates> replicaStatesSupplier;
+ private PerReplicaStates perReplicaStates;
+ private boolean isPerReplicaState = false;
+
+ public StatesProvider(Supplier<PerReplicaStates> replicaStatesSupplier) {
+ this.replicaStatesSupplier = replicaStatesSupplier;
+ }
+
+ @Override
+ public Optional<ReplicaStatesProvider> get() {
+ return isPerReplicaState ? Optional.of(this) : Optional.empty();
+ }
+
+ @Override
+ public PerReplicaStates getStates() {
+ if(perReplicaStates == null) perReplicaStates = replicaStatesSupplier.get();
+ return perReplicaStates;
+ }
+
+ }
+
}
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
index c35ee8a..7839532 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
@@ -16,6 +16,7 @@
*/
package org.apache.solr.common.cloud;
+import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
@@ -30,6 +31,8 @@ import java.util.function.BiConsumer;
import java.util.function.BiPredicate;
import org.noggit.JSONWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import static org.apache.solr.common.cloud.ZkStateReader.NRT_REPLICAS;
import static org.apache.solr.common.cloud.ZkStateReader.PULL_REPLICAS;
@@ -42,9 +45,12 @@ import static org.apache.solr.common.util.Utils.toJSONString;
* Models a Collection in zookeeper (but that Java name is obviously taken, hence "DocCollection")
*/
public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
public static final String DOC_ROUTER = "router";
public static final String SHARDS = "shards";
+ public static final String PER_REPLICA_STATE = "perReplicaState";
private final int znodeVersion;
@@ -55,12 +61,16 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
private final Map<String, List<Replica>> nodeNameReplicas;
private final Map<String, List<Replica>> nodeNameLeaderReplicas;
private final DocRouter router;
+ private final String znode;
private final Integer replicationFactor;
private final Integer numNrtReplicas;
private final Integer numTlogReplicas;
private final Integer numPullReplicas;
+ private final Boolean perReplicaState;
private final Boolean readOnly;
+ private final Map<String, Replica> replicaMap = new HashMap<>();
+ private volatile PerReplicaStates perReplicaStates;
public DocCollection(String name, Map<String, Slice> slices, Map<String, Object> props, DocRouter router) {
this(name, slices, props, router, Integer.MAX_VALUE);
@@ -86,6 +96,7 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
this.numNrtReplicas = (Integer) verifyProp(props, NRT_REPLICAS, 0);
this.numTlogReplicas = (Integer) verifyProp(props, TLOG_REPLICAS, 0);
this.numPullReplicas = (Integer) verifyProp(props, PULL_REPLICAS, 0);
+ this.perReplicaState = (Boolean) verifyProp(props, PER_REPLICA_STATE, Boolean.FALSE);
Boolean readOnly = (Boolean) verifyProp(props, READ_ONLY);
this.readOnly = readOnly == null ? Boolean.FALSE : readOnly;
@@ -98,13 +109,42 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
}
for (Replica replica : slice.getValue()) {
addNodeNameReplica(replica);
+ if(perReplicaState) {
+ replicaMap.put(replica.getName(), replica);
+ }
}
}
this.activeSlicesArr = activeSlices.values().toArray(new Slice[activeSlices.size()]);
this.router = router;
+ this.znode = ZkStateReader.getCollectionPath(name);
assert name != null && slices != null;
}
+ /**Update our state with a state of a {@link Replica}
+ * Used to create a new Collection State when only a replica is updated
+ */
+ public DocCollection copyWith( PerReplicaStates newPerReplicaStates) {
+ log.debug("collection :{} going to be updated : per-replica state :{} -> {}",
+ name,
+ getChildNodesVersion(), newPerReplicaStates.cversion);
+ if(getChildNodesVersion() == newPerReplicaStates.cversion) return this;
+ Set<String> modifiedReplicas = PerReplicaStates.findModifiedReplicas(newPerReplicaStates, this.perReplicaStates);
+ if(modifiedReplicas.isEmpty()) return this; //nothing is modified
+ Map<String, Slice> modifiedShards = new HashMap<>(getSlicesMap());
+ for (String s : modifiedReplicas) {
+ Replica replica = getReplica(s);
+ if(replica != null) {
+ Replica newReplica = replica.copyWith(newPerReplicaStates.get(s));
+ Slice shard = modifiedShards.get(replica.shard);
+ modifiedShards.put(replica.shard, shard.copyWith(newReplica));
+ }
+ }
+ DocCollection result = new DocCollection(getName(), modifiedShards, propMap, router, znodeVersion);
+ result.perReplicaStates = newPerReplicaStates;
+ return result;
+
+ }
+
private void addNodeNameReplica(Replica replica) {
List<Replica> replicas = nodeNameReplicas.get(replica.getNodeName());
if (replicas == null) {
@@ -138,6 +178,8 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
return Integer.parseInt(o.toString());
case READ_ONLY:
return Boolean.parseBoolean(o.toString());
+ case PER_REPLICA_STATE:
+ return Boolean.parseBoolean(o.toString());
case "snitch":
default:
return o;
@@ -224,6 +266,16 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
public int getZNodeVersion(){
return znodeVersion;
}
+ public int getChildNodesVersion() {
+ return perReplicaStates == null ? -1 : perReplicaStates.cversion;
+ }
+
+ public boolean isModified(int dataVersion, int childVersion) {
+ if(dataVersion > znodeVersion) return true;
+ if(childVersion > getChildNodesVersion()) return true;
+ return false;
+
+ }
/**
* @return replication factor for this collection or null if no
@@ -232,7 +284,12 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
public Integer getReplicationFactor() {
return replicationFactor;
}
-
+
+ public String getZNode(){
+ return znode;
+ }
+
+
public DocRouter getRouter() {
return router;
}
@@ -255,6 +312,9 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
}
public Replica getReplica(String coreNodeName) {
+ if(perReplicaState) {
+ return replicaMap.get(coreNodeName);
+ }
for (Slice slice : slices.values()) {
Replica replica = slice.getReplica(coreNodeName);
if (replica != null) return replica;
@@ -375,6 +435,14 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
public Integer getNumPullReplicas() {
return numPullReplicas;
}
+ public boolean isPerReplicaState() {
+ return Boolean.TRUE.equals(perReplicaState);
+ }
+
+ public PerReplicaStates getPerReplicaStates() {
+ return perReplicaStates;
+ }
+
public int getExpectedReplicaCount(Replica.Type type, int def) {
Integer result = null;
@@ -382,5 +450,6 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
if (type == Replica.Type.PULL) result = numPullReplicas;
if (type == Replica.Type.TLOG) result = numTlogReplicas;
return result == null ? def : result;
+
}
}
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java b/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java
new file mode 100644
index 0000000..3eb2ca2
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java
@@ -0,0 +1,587 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.common.cloud;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiConsumer;
+
+import org.apache.solr.cluster.api.SimpleMap;
+import org.apache.solr.common.MapWriter;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.annotation.JsonProperty;
+import org.apache.solr.common.util.ReflectMapWriter;
+import org.apache.solr.common.util.StrUtils;
+import org.apache.solr.common.util.WrappedSimpleMap;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.util.Collections.singletonList;
+import static org.apache.solr.common.params.CommonParams.NAME;
+import static org.apache.solr.common.params.CommonParams.VERSION;
+
+/**
+ * This represents the individual replica states in a collection
+ * This is an immutable object. When states are modified, a new instance is constructed
+ */
+public class PerReplicaStates implements ReflectMapWriter {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ public static final char SEPARATOR = ':';
+
+
+ @JsonProperty
+ public final String path;
+
+ @JsonProperty
+ public final int cversion;
+
+ @JsonProperty
+ public final SimpleMap<State> states;
+
+ public PerReplicaStates(String path, int cversion, List<String> states) {
+ this.path = path;
+ this.cversion = cversion;
+ Map<String, State> tmp = new LinkedHashMap<>();
+
+ for (String state : states) {
+ State rs = State.parse(state);
+ if (rs == null) continue;
+ State existing = tmp.get(rs.replica);
+ if (existing == null) {
+ tmp.put(rs.replica, rs);
+ } else {
+ tmp.put(rs.replica, rs.insert(existing));
+ }
+ }
+ this.states = new WrappedSimpleMap<>(tmp);
+
+ }
+
+ /**Get the changed replicas
+ */
+ public static Set<String> findModifiedReplicas(PerReplicaStates old, PerReplicaStates fresh) {
+ Set<String> result = new HashSet<>();
+ if(fresh == null) {
+ old.states.forEachKey(result::add);
+ return result;
+ }
+ old.states.forEachEntry((s, state) -> {
+ //the state is modified or missing
+ if(!Objects.equals(fresh.get(s) , state)) result.add(s);
+ });
+ fresh.states.forEachEntry((s, state) -> { if(old.get(s) == null ) result.add(s);
+ });
+ return result;
+ }
+
+ /**
+ * This is a persist operation with retry if a write fails due to stale state
+ */
+ public static void persist(WriteOps ops, String znode, SolrZkClient zkClient) throws KeeperException, InterruptedException {
+ try {
+ persist(ops.get(), znode, zkClient);
+ } catch (KeeperException.NodeExistsException | KeeperException.NoNodeException e) {
+ //state is stale
+ log.info("stale state for {} . retrying...", znode);
+ List<Op> freshOps = ops.get(PerReplicaStates.fetch(znode, zkClient, null));
+ persist(freshOps, znode, zkClient);
+ log.info("retried for stale state {}, succeeded", znode);
+ }
+ }
+
+ /**
+ * Persist a set of operations to Zookeeper
+ */
+ public static void persist(List<Op> operations, String znode, SolrZkClient zkClient) throws KeeperException, InterruptedException {
+ if (operations == null || operations.isEmpty()) return;
+ log.debug("Per-replica state being persisted for :{}, ops: {}", znode, operations);
+
+ List<org.apache.zookeeper.Op> ops = new ArrayList<>(operations.size());
+ for (Op op : operations) {
+ //the state of the replica is being updated
+ String path = znode + "/" + op.state.asString;
+ List<ACL> acls = zkClient.getZkACLProvider().getACLsToAdd(path);
+ ops.add(op.typ == Op.Type.ADD ?
+ org.apache.zookeeper.Op.create(path, null, acls, CreateMode.PERSISTENT) :
+ org.apache.zookeeper.Op.delete(path, -1));
+ }
+ try {
+ zkClient.multi(ops, true);
+ if (log.isDebugEnabled()) {
+ //nocommit
+ try {
+ Stat stat = zkClient.exists(znode, null, true);
+ log.debug("After update, cversion : {}", stat.getCversion());
+ } catch (Exception e) {
+ }
+
+ }
+ } catch (KeeperException e) {
+ log.error("multi op exception : " + e.getMessage() + zkClient.getChildren(znode, null, true));
+ throw e;
+ }
+
+ }
+
+
+ /**
+ * Fetch the latest {@link PerReplicaStates} . It fetches data after checking the {@link Stat#getCversion()} of state.json.
+ * If this is not modified, the same object is returned
+ */
+ public static PerReplicaStates fetch(String path, SolrZkClient zkClient, PerReplicaStates current) {
+ try {
+ if (current != null) {
+ Stat stat = zkClient.exists(current.path, null, true);
+ if (stat == null) return new PerReplicaStates(path, -1, Collections.emptyList());
+ if (current.cversion == stat.getCversion()) return current;// not modifiedZkStateReaderTest
+ }
+ Stat stat = new Stat();
+ List<String> children = zkClient.getChildren(path, null, stat, true);
+ return new PerReplicaStates(path, stat.getCversion(), Collections.unmodifiableList(children));
+ } catch (KeeperException e) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error fetching per-replica states", e);
+ } catch (InterruptedException e) {
+ SolrZkClient.checkInterrupted(e);
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Thread interrupted when loading per-replica states from " + path, e);
+ }
+ }
+
+
+ private static List<Op> addDeleteStaleNodes(List<Op> ops, State rs) {
+ while (rs != null) {
+ ops.add(new Op(Op.Type.DELETE, rs));
+ rs = rs.duplicate;
+ }
+ return ops;
+ }
+
+ public static String getReplicaName(String s) {
+ int idx = s.indexOf(SEPARATOR);
+ if (idx > 0) {
+ return s.substring(0, idx);
+ }
+ return null;
+ }
+
+ public State get(String replica) {
+ return states.get(replica);
+ }
+
+ public static class Op {
+ public final Type typ;
+ public final State state;
+
+ public Op(Type typ, State replicaState) {
+ this.typ = typ;
+ this.state = replicaState;
+ }
+
+
+ public enum Type {
+ //add a new node
+ ADD,
+ //delete an existing node
+ DELETE
+ }
+
+ @Override
+ public String toString() {
+ return typ.toString() + " : " + state;
+ }
+ }
+
+
+ /**
+ * The state of a replica as stored as a node under /collections/collection-name/state.json/replica-state
+ */
+ public static class State implements MapWriter {
+
+ public final String replica;
+
+ public final Replica.State state;
+
+ public final Boolean isLeader;
+
+ public final int version;
+
+ public final String asString;
+
+ /**
+ * if there are multiple entries for the same replica, e.g: core_node_1:12:A core_node_1:13:D
+ * <p>
+ * the entry with '13' is the latest and the one with '12' is considered a duplicate
+ * <p>
+ * These are unlikely, but possible
+ */
+ final State duplicate;
+
+ private State(String serialized, List<String> pieces) {
+ this.asString = serialized;
+ replica = pieces.get(0);
+ version = Integer.parseInt(pieces.get(1));
+ String encodedStatus = pieces.get(2);
+ this.state = Replica.getState(encodedStatus);
+ isLeader = pieces.size() > 3 && "L".equals(pieces.get(3));
+ duplicate = null;
+ }
+
+ public static State parse(String serialized) {
+ List<String> pieces = StrUtils.splitSmart(serialized, ':');
+ if (pieces.size() < 3) return null;
+ return new State(serialized, pieces);
+
+ }
+
+ public State(String replica, Replica.State state, Boolean isLeader, int version) {
+ this(replica, state, isLeader, version, null);
+ }
+
+ public State(String replica, Replica.State state, Boolean isLeader, int version, State duplicate) {
+ this.replica = replica;
+ this.state = state == null ? Replica.State.ACTIVE : state;
+ this.isLeader = isLeader == null ? Boolean.FALSE : isLeader;
+ this.version = version;
+ asString = serialize();
+ this.duplicate = duplicate;
+ }
+
+ @Override
+ public void writeMap(EntryWriter ew) throws IOException {
+ ew.put(NAME, replica);
+ ew.put(VERSION, version);
+ ew.put(ZkStateReader.STATE_PROP, state.toString());
+ if (isLeader) ew.put(Slice.LEADER, isLeader);
+ ew.putIfNotNull("duplicate", duplicate);
+ }
+
+ private State insert(State duplicate) {
+ assert this.replica.equals(duplicate.replica);
+ if (this.version >= duplicate.version) {
+ if (this.duplicate != null) {
+ duplicate = new State(duplicate.replica, duplicate.state, duplicate.isLeader, duplicate.version, this.duplicate);
+ }
+ return new State(this.replica, this.state, this.isLeader, this.version, duplicate);
+ } else {
+ return duplicate.insert(this);
+ }
+ }
+
+ /**
+ * fetch duplicates entries for this replica
+ */
+ List<State> getDuplicates() {
+ if (duplicate == null) return Collections.emptyList();
+ List<State> result = new ArrayList<>();
+ State current = duplicate;
+ while (current != null) {
+ result.add(current);
+ current = current.duplicate;
+ }
+ return result;
+ }
+
+ private String serialize() {
+ StringBuilder sb = new StringBuilder(replica)
+ .append(":")
+ .append(version)
+ .append(":")
+ .append(state.shortName);
+ if (isLeader) sb.append(":").append("L");
+ return sb.toString();
+ }
+
+
+ @Override
+ public String toString() {
+ return asString;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof State) {
+ State that = (State) o;
+ return Objects.equals(this.asString, that.asString);
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return asString.hashCode();
+ }
+ }
+
+
+ public static abstract class WriteOps {
+ private PerReplicaStates rs;
+ List<Op> ops;
+ private boolean preOp = true;
+
+ /**
+ * state of a replica is changed
+ *
+ * @param newState the new state
+ */
+ public static WriteOps flipState(String replica, Replica.State newState, PerReplicaStates rs) {
+ return new WriteOps() {
+ @Override
+ protected List<Op> refresh(PerReplicaStates rs) {
+ List<Op> ops = new ArrayList<>(2);
+ State existing = rs.get(replica);
+ if (existing == null) {
+ ops.add(new Op(Op.Type.ADD, new State(replica, newState, Boolean.FALSE, 0)));
+ } else {
+ ops.add(new Op(Op.Type.ADD, new State(replica, newState, existing.isLeader, existing.version + 1)));
+ addDeleteStaleNodes(ops, existing);
+ }
+ if (log.isDebugEnabled()) {
+ log.debug("flipState on {}, {} -> {}, ops :{}", rs.path, replica, newState, ops);
+ }
+ return ops;
+ }
+ }.init(rs);
+ }
+
+ public PerReplicaStates getPerReplicaStates() {
+ return rs;
+ }
+
+
+ /**Switch a collection from/to perReplicaState=true
+ */
+ public static WriteOps modifyCollection(DocCollection coll, boolean enable, PerReplicaStates prs) {
+ return new WriteOps() {
+ @Override
+ List<Op> refresh(PerReplicaStates prs) {
+ return enable ? enable(coll) : disable(prs);
+ }
+
+ List<Op> enable(DocCollection coll) {
+ List<Op> result = new ArrayList<>();
+ coll.forEachReplica((s, r) -> result.add(new Op(Op.Type.ADD, new State(r.getName(), r.getState(), r.isLeader(), 0))));
+ return result;
+ }
+
+ List<Op> disable(PerReplicaStates prs) {
+ List<Op> result = new ArrayList<>();
+ prs.states.forEachEntry((s, state) -> result.add(new Op(Op.Type.DELETE, state)));
+ return result;
+ }
+ }.init(prs);
+
+ }
+
+ /**
+ * Flip the leader replica to a new one
+ *
+ * @param allReplicas allReplicas of the shard
+ * @param next next leader
+ */
+ public static WriteOps flipLeader(Set<String> allReplicas, String next, PerReplicaStates rs) {
+ return new WriteOps() {
+
+ @Override
+ protected List<Op> refresh(PerReplicaStates rs) {
+ List<Op> ops = new ArrayList<>(4);
+ if(next != null) {
+ State st = rs.get(next);
+ if (st != null) {
+ if (!st.isLeader) {
+ ops.add(new Op(Op.Type.ADD, new State(st.replica, Replica.State.ACTIVE, Boolean.TRUE, st.version + 1)));
+ ops.add(new Op(Op.Type.DELETE, st));
+ }
+ //else do not do anything , that node is the leader
+ } else {
+ //there is no entry for the new leader.
+ //create one
+ ops.add(new Op(Op.Type.ADD, new State(next, Replica.State.ACTIVE, Boolean.TRUE, 0)));
+ }
+ }
+
+ //now go through all other replicas and unset previous leader
+ for (String r : allReplicas) {
+ State st = rs.get(r);
+ if (st == null) continue;//unlikely
+ if (!Objects.equals(r, next)) {
+ if(st.isLeader) {
+ //some other replica is the leader now. unset
+ ops.add(new Op(Op.Type.ADD, new State(st.replica, st.state, Boolean.FALSE, st.version + 1)));
+ ops.add(new Op(Op.Type.DELETE, st));
+ }
+ }
+ }
+ if (log.isDebugEnabled()) {
+ log.debug("flipLeader on:{}, {} -> {}, ops: {}", rs.path, allReplicas, next, ops);
+ }
+ return ops;
+ }
+
+ }.init(rs);
+ }
+
+ /**
+ * Delete a replica entry from per-replica states
+ *
+ * @param replica name of the replica to be deleted
+ */
+ public static WriteOps deleteReplica(String replica, PerReplicaStates rs) {
+ return new WriteOps() {
+ @Override
+ protected List<Op> refresh(PerReplicaStates rs) {
+ List<Op> result;
+ if (rs == null) {
+ result = Collections.emptyList();
+ } else {
+ State state = rs.get(replica);
+ result = addDeleteStaleNodes(new ArrayList<>(), state);
+ }
+ return result;
+ }
+ }.init(rs);
+ }
+
+ public static WriteOps addReplica(String replica, Replica.State state, boolean isLeader, PerReplicaStates rs) {
+ return new WriteOps() {
+ @Override
+ protected List<Op> refresh(PerReplicaStates rs) {
+ return singletonList(new Op(Op.Type.ADD,
+ new State(replica, state, isLeader, 0)));
+ }
+ }.init(rs);
+ }
+
+ /**
+ * mark a bunch of replicas as DOWN
+ */
+ public static WriteOps downReplicas(List<String> replicas, PerReplicaStates rs) {
+ return new WriteOps() {
+ @Override
+ List<Op> refresh(PerReplicaStates rs) {
+ List<Op> ops = new ArrayList<>();
+ for (String replica : replicas) {
+ State r = rs.get(replica);
+ if (r != null) {
+ if (r.state == Replica.State.DOWN && !r.isLeader) continue;
+ ops.add(new Op(Op.Type.ADD, new State(replica, Replica.State.DOWN, Boolean.FALSE, r.version + 1)));
+ addDeleteStaleNodes(ops, r);
+ } else {
+ ops.add(new Op(Op.Type.ADD, new State(replica, Replica.State.DOWN, Boolean.FALSE, 0)));
+ }
+ }
+ if (log.isDebugEnabled()) {
+ log.debug("for coll: {} down replicas {}, ops {}", rs, replicas, ops);
+ }
+ return ops;
+ }
+ }.init(rs);
+ }
+
+ /**
+ * Just creates and deletes a summy entry so that the {@link Stat#getCversion()} of states.json
+ * is updated
+ */
+ public static WriteOps touchChildren() {
+ WriteOps result = new WriteOps() {
+ @Override
+ List<Op> refresh(PerReplicaStates rs) {
+ List<Op> ops = new ArrayList<>();
+ State st = new State(".dummy." + System.nanoTime(), Replica.State.DOWN, Boolean.FALSE, 0);
+ ops.add(new Op(Op.Type.ADD, st));
+ ops.add(new Op(Op.Type.DELETE, st));
+ if (log.isDebugEnabled()) {
+ log.debug("touchChildren {}", ops);
+ }
+ return ops;
+ }
+ };
+ result.preOp = false;
+ result.ops = result.refresh(null);
+ return result;
+ }
+
+ WriteOps init(PerReplicaStates rs) {
+ if (rs == null) return null;
+ get(rs);
+ return this;
+ }
+
+ public List<Op> get() {
+ return ops;
+ }
+
+ public List<Op> get(PerReplicaStates rs) {
+ ops = refresh(rs);
+ if (ops == null) ops = Collections.emptyList();
+ this.rs = rs;
+ return ops;
+ }
+
+ /**
+ * To be executed before collection state.json is persisted
+ */
+ public boolean isPreOp() {
+ return preOp;
+ }
+
+ /**
+ * if a multi operation fails because the state got modified from behind,
+ * refresh the operation and try again
+ *
+ * @param prs The new state
+ */
+ abstract List<Op> refresh(PerReplicaStates prs);
+
+ @Override
+ public String toString() {
+ return ops.toString();
+ }
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("{").append(path).append("/[").append(cversion).append("]: [");
+ appendStates(sb);
+ return sb.append("]}").toString();
+ }
+
+ private StringBuilder appendStates(StringBuilder sb) {
+ states.forEachEntry(new BiConsumer<String, State>() {
+ int count = 0;
+ @Override
+ public void accept(String s, State state) {
+ if(count++ > 0) sb.append(", ");
+ sb.append(state.asString);
+ for (State d : state.getDuplicates()) sb.append(d.asString);
+ }
+ });
+ return sb;
+ }
+
+}
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java b/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
index 5f3fd9b..e6c412c 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
@@ -17,6 +17,7 @@
package org.apache.solr.common.cloud;
import java.io.IOException;
+import java.lang.invoke.MethodHandles;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
@@ -29,12 +30,15 @@ import java.util.function.BiPredicate;
import org.apache.solr.common.MapWriter;
import org.apache.solr.common.util.Utils;
import org.noggit.JSONWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import static org.apache.solr.common.ConditionalMapWriter.NON_NULL_VAL;
import static org.apache.solr.common.ConditionalMapWriter.dedupeKeyPredicate;
import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP;
public class Replica extends ZkNodeProps implements MapWriter {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
/**
* The replica's state. In general, if the node the replica is hosted on is
@@ -53,7 +57,7 @@ public class Replica extends ZkNodeProps implements MapWriter {
* {@link ClusterState#liveNodesContain(String)}).
* </p>
*/
- ACTIVE,
+ ACTIVE("A"),
/**
* The first state before {@link State#RECOVERING}. A node in this state
@@ -64,13 +68,13 @@ public class Replica extends ZkNodeProps implements MapWriter {
* should not be relied on.
* </p>
*/
- DOWN,
+ DOWN("D"),
/**
* The node is recovering from the leader. This might involve peer-sync,
* full replication or finding out things are already in sync.
*/
- RECOVERING,
+ RECOVERING("R"),
/**
* Recovery attempts have not worked, something is not right.
@@ -80,8 +84,16 @@ public class Replica extends ZkNodeProps implements MapWriter {
* cluster and it's state should be discarded.
* </p>
*/
- RECOVERY_FAILED;
-
+ RECOVERY_FAILED("F");
+
+ /**short name for a state. Used to encode this in the state node see {@link PerReplicaStates.State}
+ */
+ public final String shortName;
+
+ State(String c) {
+ this.shortName = c;
+ }
+
@Override
public String toString() {
return super.toString().toLowerCase(Locale.ROOT);
@@ -89,7 +101,7 @@ public class Replica extends ZkNodeProps implements MapWriter {
/** Converts the state string to a State instance. */
public static State getState(String stateStr) {
- return stateStr == null ? null : State.valueOf(stateStr.toUpperCase(Locale.ROOT));
+ return stateStr == null ? null : Replica.State.valueOf(stateStr.toUpperCase(Locale.ROOT));
}
}
@@ -125,6 +137,8 @@ public class Replica extends ZkNodeProps implements MapWriter {
public final Type type;
public final String shard, collection;
+ private PerReplicaStates.State replicaState;
+
// mutable
private State state;
@@ -177,9 +191,18 @@ public class Replica extends ZkNodeProps implements MapWriter {
this.shard = String.valueOf(details.get("shard"));
this.core = String.valueOf(details.get("core"));
this.node = String.valueOf(details.get("node_name"));
- type = Replica.Type.valueOf(String.valueOf(details.getOrDefault(ZkStateReader.REPLICA_TYPE, "NRT")));
- state = State.getState(String.valueOf(details.getOrDefault(ZkStateReader.STATE_PROP, "active")));
+
this.propMap.putAll(details);
+ ClusterState.getReplicaStatesProvider().get().ifPresent(it -> {
+ log.debug("A replica {} state fetched from per-replica state", name);
+ replicaState = it.getStates().get(name);
+ if(replicaState!= null) {
+ propMap.put(ZkStateReader.STATE_PROP, replicaState.state.toString().toLowerCase(Locale.ROOT));
+ if (replicaState.isLeader) propMap.put(Slice.LEADER, "true");
+ }
+ }) ;
+ type = Replica.Type.valueOf(String.valueOf(propMap.getOrDefault(ZkStateReader.REPLICA_TYPE, "NRT")));
+ if(state == null) state = State.getState(String.valueOf(propMap.getOrDefault(ZkStateReader.STATE_PROP, "active")));
validate();
propMap.put(BASE_URL_PROP, UrlScheme.INSTANCE.getBaseUrlForNodeName(this.node));
}
@@ -273,6 +296,7 @@ public class Replica extends ZkNodeProps implements MapWriter {
}
public boolean isLeader() {
+ if(replicaState != null) return replicaState.isLeader;
return getBool(ZkStateReader.LEADER_PROP, false);
}
@@ -295,6 +319,21 @@ public class Replica extends ZkNodeProps implements MapWriter {
final String propertyValue = getStr(propertyKey);
return propertyValue;
}
+ public Replica copyWith(PerReplicaStates.State state) {
+ log.debug("A replica is updated with new state : {}", state);
+ Map<String, Object> props = new LinkedHashMap<>(propMap);
+ if (state == null) {
+ props.put(ZkStateReader.STATE_PROP, State.DOWN.toString());
+ props.remove(Slice.LEADER);
+ } else {
+ props.put(ZkStateReader.STATE_PROP, state.state.toString());
+ if (state.isLeader) props.put(Slice.LEADER, "true");
+ }
+ Replica r = new Replica(name, props, collection, shard);
+ r.replicaState = state;
+ return r;
+ }
+
public Object clone() {
return new Replica(name, node, collection, shard, core, state, type, propMap);
@@ -305,6 +344,17 @@ public class Replica extends ZkNodeProps implements MapWriter {
ew.put(name, _allPropsWriter());
}
+ private static final Map<String, State> STATES = new HashMap<>();
+ static {
+ STATES.put(Replica.State.ACTIVE.shortName, Replica.State.ACTIVE);
+ STATES.put(Replica.State.DOWN.shortName, Replica.State.DOWN);
+ STATES.put(Replica.State.RECOVERING.shortName, Replica.State.RECOVERING);
+ STATES.put(Replica.State.RECOVERY_FAILED.shortName, Replica.State.RECOVERY_FAILED);
+ }
+ public static State getState(String c) {
+ return STATES.get(c);
+ }
+
private MapWriter _allPropsWriter() {
BiPredicate<CharSequence, Object> p = dedupeKeyPredicate(new HashSet<>())
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/Slice.java b/solr/solrj/src/java/org/apache/solr/common/cloud/Slice.java
index 4378ef7..23796fb 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/Slice.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/Slice.java
@@ -16,6 +16,7 @@
*/
package org.apache.solr.common.cloud;
+import java.lang.invoke.MethodHandles;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
@@ -25,11 +26,14 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
+import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.solr.common.cloud.Replica.Type;
import org.noggit.JSONWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import static org.apache.solr.common.util.Utils.toJSONString;
@@ -37,6 +41,8 @@ import static org.apache.solr.common.util.Utils.toJSONString;
* A Slice contains immutable information about a logical shard (all replicas that share the same shard id).
*/
public class Slice extends ZkNodeProps implements Iterable<Replica> {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
public final String collection;
/** Loads multiple slices into a Map from a generic Map that probably came from deserialized JSON. */
@@ -61,6 +67,16 @@ public class Slice extends ZkNodeProps implements Iterable<Replica> {
return replicas.values().iterator();
}
+ /**Make a copy with a modified replica
+ */
+ public Slice copyWith(Replica modified) {
+ if(log.isDebugEnabled()) {
+ log.debug("modified replica : {}", modified);
+ }
+ Map<String, Replica> replicasCopy = new LinkedHashMap<>(replicas);
+ replicasCopy.put(modified.getName(), modified);
+ return new Slice(name, replicasCopy, propMap, collection);
+ }
/** The slice's state. */
public enum State {
@@ -210,7 +226,7 @@ public class Slice extends ZkNodeProps implements Iterable<Replica> {
private Replica findLeader() {
for (Replica replica : replicas.values()) {
- if (replica.getStr(LEADER) != null) {
+ if (replica.isLeader()) {
assert replica.getType() == Type.TLOG || replica.getType() == Type.NRT: "Pull replica should not become leader!";
return replica;
}
@@ -235,6 +251,10 @@ public class Slice extends ZkNodeProps implements Iterable<Replica> {
return replicas.values();
}
+ public Set<String> getReplicaNames() {
+ return Collections.unmodifiableSet(replicas.keySet());
+ }
+
/**
* Gets all replicas that match a predicate
*/
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
index 1ac21fe..7a12c92 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
@@ -331,6 +331,18 @@ public class SolrZkClient implements Closeable {
}
/**
+ * Returns children of the node at the path
+ */
+ public List<String> getChildren(final String path, final Watcher watcher,Stat stat, boolean retryOnConnLoss)
+ throws KeeperException, InterruptedException {
+ if (retryOnConnLoss) {
+ return zkCmdExecutor.retryOperation(() -> keeper.getChildren(path, wrapWatcher(watcher) , stat));
+ } else {
+ return keeper.getChildren(path, wrapWatcher(watcher), stat);
+ }
+ }
+
+ /**
* Returns node's data
*/
public byte[] getData(final String path, final Watcher watcher, final Stat stat, boolean retryOnConnLoss)
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index 7ff69cf..118dc05 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -23,11 +23,13 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Objects;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
@@ -655,7 +657,7 @@ public class ZkStateReader implements SolrCloseable {
private class LazyCollectionRef extends ClusterState.CollectionRef {
private final String collName;
- private long lastUpdateTime;
+ private volatile long lastUpdateTime;
private DocCollection cachedDocCollection;
public LazyCollectionRef(String collName) {
@@ -675,7 +677,7 @@ public class ZkStateReader implements SolrCloseable {
exists = zkClient.exists(getCollectionPath(collName), null, true);
} catch (Exception e) {
}
- if (exists != null && exists.getVersion() == cachedDocCollection.getZNodeVersion()) {
+ if (exists != null && !cachedDocCollection.isModified(exists.getVersion(), exists.getCversion())) {
shouldFetch = false;
}
}
@@ -1195,9 +1197,11 @@ public class ZkStateReader implements SolrCloseable {
*/
class StateWatcher implements Watcher {
private final String coll;
+ private final String collectionPath;
StateWatcher(String coll) {
this.coll = coll;
+ collectionPath = getCollectionPath(coll);
}
@Override
@@ -1216,20 +1220,32 @@ public class ZkStateReader implements SolrCloseable {
Set<String> liveNodes = ZkStateReader.this.liveNodes;
if (log.isInfoEnabled()) {
log.info("A cluster state change: [{}] for collection [{}] has occurred - updating... (live nodes size: [{}])",
- event, coll, liveNodes.size());
+ event, coll, liveNodes.size());
}
- refreshAndWatch();
+ refreshAndWatch(event.getType());
}
+ public void refreshAndWatch() {
+ refreshAndWatch(null);
+ }
+
/**
* Refresh collection state from ZK and leave a watch for future changes.
* As a side effect, updates {@link #clusterState} and {@link #watchedCollectionStates}
* with the results of the refresh.
*/
- public void refreshAndWatch() {
+ public void refreshAndWatch(EventType eventType) {
try {
+ if (eventType == null || eventType == EventType.NodeChildrenChanged) {
+ refreshAndWatchChildren();
+ if (eventType == EventType.NodeChildrenChanged) {
+ //only per-replica states modified. return
+ return;
+ }
+ }
+
DocCollection newState = fetchCollectionState(coll, this);
updateWatchedCollection(coll, newState);
synchronized (getUpdateLock()) {
@@ -1246,6 +1262,32 @@ public class ZkStateReader implements SolrCloseable {
log.error("Unwatched collection: [{}]", coll, e);
}
}
+
+ private void refreshAndWatchChildren() throws KeeperException, InterruptedException {
+ Stat stat = new Stat();
+ List<String> replicaStates = null;
+ try {
+ replicaStates = zkClient.getChildren(collectionPath, this, stat, true);
+ PerReplicaStates newStates = new PerReplicaStates(collectionPath, stat.getCversion(), replicaStates);
+ DocCollection oldState = watchedCollectionStates.get(coll);
+ DocCollection newState = null;
+ if (oldState != null) {
+ newState = oldState.copyWith(newStates);
+ } else {
+ newState = fetchCollectionState(coll, null);
+ }
+ updateWatchedCollection(coll, newState);
+ synchronized (getUpdateLock()) {
+ constructState(Collections.singleton(coll));
+ }
+ if (log.isDebugEnabled()) {
+ log.debug("updated per-replica states changed for: {}, ver: {} , new vals: {}", coll, stat.getCversion(), replicaStates);
+ }
+
+ } catch (NoNodeException e) {
+ log.info("{} is deleted, stop watching children", collectionPath);
+ }
+ }
}
/**
@@ -1419,9 +1461,19 @@ public class ZkStateReader implements SolrCloseable {
}
}
- private DocCollection fetchCollectionState(String coll, Watcher watcher) throws KeeperException, InterruptedException {
- String collectionPath = getCollectionPath(coll);
+ public DocCollection fetchCollectionState(String coll, Watcher watcher) throws KeeperException, InterruptedException {
+ String collectionPath = getCollectionPath(coll);
while (true) {
+ ClusterState.initReplicaStateProvider(() -> {
+ try {
+ PerReplicaStates replicaStates = getReplicaStates(new PerReplicaStates(collectionPath, 0, Collections.emptyList()));
+ log.info("per-replica-state ver: {} fetched for initializing {} ", replicaStates.cversion, collectionPath);
+ return replicaStates;
+ } catch (Exception e) {
+ //TODO
+ throw new RuntimeException(e);
+ }
+ });
try {
Stat stat = new Stat();
byte[] data = zkClient.getData(collectionPath, watcher, stat, true);
@@ -1439,6 +1491,8 @@ public class ZkStateReader implements SolrCloseable {
}
}
return null;
+ } finally {
+ ClusterState.clearReplicaStateProvider();
}
}
}
@@ -1566,11 +1620,26 @@ public class ZkStateReader implements SolrCloseable {
}
DocCollection state = clusterState.getCollectionOrNull(collection);
+ state = updatePerReplicaState(state);
if (stateWatcher.onStateChanged(state) == true) {
removeDocCollectionWatcher(collection, stateWatcher);
}
}
+ private DocCollection updatePerReplicaState(DocCollection c) {
+ if (c == null || !c.isPerReplicaState()) return c;
+ PerReplicaStates current = c.getPerReplicaStates();
+ PerReplicaStates newPrs = PerReplicaStates.fetch(c.getZNode(), zkClient, current);
+ if (newPrs != current) {
+ log.debug("just-in-time update for a fresh per-replica-state {}", c.getName());
+ DocCollection modifiedColl = c.copyWith(newPrs);
+ updateWatchedCollection(c.getName(), modifiedColl);
+ return modifiedColl;
+ } else {
+ return c;
+ }
+ }
+
/**
* Block until a CollectionStatePredicate returns true, or the wait times out
*
@@ -1804,7 +1873,9 @@ public class ZkStateReader implements SolrCloseable {
break;
}
} else {
- if (oldState.getZNodeVersion() >= newState.getZNodeVersion()) {
+ int oldCVersion = oldState.getPerReplicaStates() == null ? -1 : oldState.getPerReplicaStates().cversion;
+ int newCVersion = newState.getPerReplicaStates() == null ? -1 : newState.getPerReplicaStates().cversion;
+ if (oldState.getZNodeVersion() >= newState.getZNodeVersion() && oldCVersion >= newCVersion) {
// no change to state, but we might have been triggered by the addition of a
// state watcher, so run notifications
updated = true;
@@ -2198,7 +2269,17 @@ public class ZkStateReader implements SolrCloseable {
}
}
+ public PerReplicaStates getReplicaStates(String path) throws KeeperException, InterruptedException {
+ return PerReplicaStates.fetch(path, zkClient, null);
+
+ }
+
+ public PerReplicaStates getReplicaStates(PerReplicaStates current) throws KeeperException, InterruptedException {
+ return PerReplicaStates.fetch(current.path, zkClient, current);
+ }
+
public DocCollection getCollection(String collection) {
- return clusterState.getCollectionOrNull(collection);
+ return clusterState == null ? null : clusterState.getCollectionOrNull(collection);
}
+
}
diff --git a/solr/solrj/src/test/org/apache/solr/common/cloud/TestPerReplicaStates.java b/solr/solrj/src/test/org/apache/solr/common/cloud/TestPerReplicaStates.java
new file mode 100644
index 0000000..25a095d
--- /dev/null
+++ b/solr/solrj/src/test/org/apache/solr/common/cloud/TestPerReplicaStates.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.common.cloud;
+
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.cloud.Replica.State;
+import org.apache.zookeeper.CreateMode;
+import org.junit.After;
+import org.junit.Before;
+
+public class TestPerReplicaStates extends SolrCloudTestCase {
+ @Before
+ public void prepareCluster() throws Exception {
+ configureCluster(4)
+ .configure();
+ }
+
+ @After
+ public void tearDownCluster() throws Exception {
+ shutdownCluster();
+ }
+
+ public void testBasic() {
+ PerReplicaStates.State rs = new PerReplicaStates.State("R1", State.ACTIVE, Boolean.FALSE, 1);
+ assertEquals("R1:1:A", rs.asString);
+
+ rs = new PerReplicaStates.State("R1", State.DOWN, Boolean.TRUE, 1);
+ assertEquals("R1:1:D:L", rs.asString);
+ rs = PerReplicaStates.State.parse (rs.asString);
+ assertEquals(State.DOWN, rs.state);
+
+ }
+
+ public void testEntries() {
+ PerReplicaStates entries = new PerReplicaStates("state.json", 0, ImmutableList.of("R1:2:A", "R1:1:A:L", "R1:0:D", "R2:0:D", "R3:0:A"));
+ assertEquals(2, entries.get("R1").version);
+ entries = new PerReplicaStates("state.json", 0, ImmutableList.of("R1:1:A:L", "R1:2:A", "R2:0:D", "R3:0:A", "R1:0:D"));
+ assertEquals(2, entries.get("R1").version);
+ assertEquals(2, entries.get("R1").getDuplicates().size());
+ Set<String> modified = PerReplicaStates.findModifiedReplicas(entries, new PerReplicaStates("state.json", 0, ImmutableList.of("R1:1:A:L", "R1:2:A", "R2:0:D", "R3:1:A", "R1:0:D")));
+ assertEquals(1, modified.size());
+ assertTrue(modified.contains("R3"));
+ modified = PerReplicaStates.findModifiedReplicas( entries,
+ new PerReplicaStates("state.json", 0, ImmutableList.of("R1:1:A:L", "R1:2:A", "R2:0:D", "R3:1:A", "R1:0:D", "R4:0:A")));
+ assertEquals(2, modified.size());
+ assertTrue(modified.contains("R3"));
+ assertTrue(modified.contains("R4"));
+ modified = PerReplicaStates.findModifiedReplicas( entries,
+ new PerReplicaStates("state.json", 0, ImmutableList.of("R1:1:A:L", "R1:2:A", "R3:1:A", "R1:0:D", "R4:0:A")));
+ assertEquals(3, modified.size());
+ assertTrue(modified.contains("R3"));
+ assertTrue(modified.contains("R4"));
+ assertTrue(modified.contains("R2"));
+
+
+ }
+
+ public void testReplicaStateOperations() throws Exception {
+ String root = "/testReplicaStateOperations";
+ cluster.getZkClient().create(root, null, CreateMode.PERSISTENT, true);
+
+ ImmutableList<String> states = ImmutableList.of("R1:2:A", "R1:1:A:L", "R1:0:D", "R3:0:A", "R4:13:A");
+
+ for (String state : states) {
+ cluster.getZkClient().create(root + "/" + state, null, CreateMode.PERSISTENT, true);
+ }
+
+ ZkStateReader zkStateReader = cluster.getSolrClient().getZkStateReader();
+ PerReplicaStates rs = zkStateReader.getReplicaStates(new PerReplicaStates(root, 0, Collections.emptyList()));
+ assertEquals(3, rs.states.size());
+ assertTrue(rs.cversion >= 5);
+
+ List<PerReplicaStates.Op> ops = PerReplicaStates.WriteOps.addReplica("R5",State.ACTIVE, false, rs).get();
+
+ assertEquals(1, ops.size());
+ assertEquals(PerReplicaStates.Op.Type.ADD ,ops.get(0).typ );
+ PerReplicaStates.persist(ops, root,cluster.getZkClient());
+ rs = zkStateReader.getReplicaStates(root);
+ assertEquals(4, rs.states.size());
+ assertTrue(rs.cversion >= 6);
+ assertEquals(6, cluster.getZkClient().getChildren(root, null,true).size());
+ ops = PerReplicaStates.WriteOps.flipState("R1", State.DOWN , rs).get();
+
+ assertEquals(4, ops.size());
+ assertEquals(PerReplicaStates.Op.Type.ADD, ops.get(0).typ);
+ assertEquals(PerReplicaStates.Op.Type.DELETE, ops.get(1).typ);
+ assertEquals(PerReplicaStates.Op.Type.DELETE, ops.get(2).typ);
+ assertEquals(PerReplicaStates.Op.Type.DELETE, ops.get(3).typ);
+ PerReplicaStates.persist(ops, root,cluster.getZkClient());
+ rs = zkStateReader.getReplicaStates(root);
+ assertEquals(4, rs.states.size());
+ assertEquals(3, rs.states.get("R1").version);
+
+ ops = PerReplicaStates.WriteOps.deleteReplica("R5" , rs).get();
+ assertEquals(1, ops.size());
+ PerReplicaStates.persist(ops, root,cluster.getZkClient());
+
+ rs = zkStateReader.getReplicaStates(root);
+ assertEquals(3, rs.states.size());
+
+ ops = PerReplicaStates.WriteOps.flipLeader(ImmutableSet.of("R4","R3","R1"), "R4",rs).get();
+
+ assertEquals(2, ops.size());
+ assertEquals(PerReplicaStates.Op.Type.ADD, ops.get(0).typ);
+ assertEquals(PerReplicaStates.Op.Type.DELETE, ops.get(1).typ);
+ PerReplicaStates.persist(ops, root,cluster.getZkClient());
+ rs = zkStateReader.getReplicaStates(root);
+ ops = PerReplicaStates.WriteOps.flipLeader(ImmutableSet.of("R4","R3","R1"),"R3",rs).get();
+ assertEquals(4, ops.size());
+ PerReplicaStates.persist(ops, root,cluster.getZkClient());
+ rs = zkStateReader.getReplicaStates(root);
+ assertTrue(rs.get("R3").isLeader);
+ }
+
+}