You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2017/12/22 13:21:25 UTC
lucene-solr:branch_7x: SOLR-11730: Add simulated tests for nodeAdded
/ nodeLost dynamic in a large cluster. Plus some other fixes: * Fix leader
election throttle and cluster state versioning in the simulator. *
PolicyHelper was still using a static Threa
Repository: lucene-solr
Updated Branches:
refs/heads/branch_7x a016ba704 -> 0290c95c4
SOLR-11730: Add simulated tests for nodeAdded / nodeLost dynamic in a large cluster.
Plus some other fixes:
* Fix leader election throttle and cluster state versioning in the simulator.
* PolicyHelper was still using a static ThreadLocal field, use ObjectCache isntead.
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/0290c95c
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/0290c95c
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/0290c95c
Branch: refs/heads/branch_7x
Commit: 0290c95c449d20eadbbd614860d0f739d131a62d
Parents: a016ba7
Author: Andrzej Bialecki <ab...@apache.org>
Authored: Fri Dec 22 12:54:48 2017 +0100
Committer: Andrzej Bialecki <ab...@apache.org>
Committed: Fri Dec 22 12:58:59 2017 +0100
----------------------------------------------------------------------
.../org/apache/solr/cloud/ActionThrottle.java | 2 +-
.../cloud/autoscaling/ExecutePlanAction.java | 6 +-
.../solr/cloud/autoscaling/NodeLostTrigger.java | 2 +-
.../cloud/autoscaling/sim/LiveNodesSet.java | 4 +
.../cloud/autoscaling/sim/SimCloudManager.java | 25 ++-
.../sim/SimClusterStateProvider.java | 209 +++++++++---------
.../autoscaling/sim/SimNodeStateProvider.java | 31 +++
.../autoscaling/sim/SimSolrCloudTestCase.java | 96 ++++++++-
.../autoscaling/sim/TestComputePlanAction.java | 13 +-
.../autoscaling/sim/TestExecutePlanAction.java | 14 --
.../cloud/autoscaling/sim/TestLargeCluster.java | 213 ++++++++++++++++++-
.../autoscaling/sim/TestNodeAddedTrigger.java | 2 +-
.../autoscaling/sim/TestNodeLostTrigger.java | 8 +-
.../solrj/cloud/autoscaling/PolicyHelper.java | 9 +-
14 files changed, 479 insertions(+), 155 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0290c95c/solr/core/src/java/org/apache/solr/cloud/ActionThrottle.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ActionThrottle.java b/solr/core/src/java/org/apache/solr/cloud/ActionThrottle.java
index 520a269..5e1d06a 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ActionThrottle.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ActionThrottle.java
@@ -70,7 +70,7 @@ public class ActionThrottle {
long diff = timeSource.getTime() - lastActionStartedAt;
int diffMs = (int) TimeUnit.MILLISECONDS.convert(diff, TimeUnit.NANOSECONDS);
long minNsBetweenActions = TimeUnit.NANOSECONDS.convert(minMsBetweenActions, TimeUnit.MILLISECONDS);
- log.info("The last {} attempt started {}ms ago.", name, diffMs);
+ log.debug("The last {} attempt started {}ms ago.", name, diffMs);
int sleep = 0;
if (diffMs > 0 && diff < minNsBetweenActions) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0290c95c/solr/core/src/java/org/apache/solr/cloud/autoscaling/ExecutePlanAction.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ExecutePlanAction.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ExecutePlanAction.java
index 47b3440..bce0806 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ExecutePlanAction.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ExecutePlanAction.java
@@ -74,7 +74,7 @@ public class ExecutePlanAction extends TriggerActionBase {
req.setWaitForFinalState(true);
String asyncId = event.getSource() + '/' + event.getId() + '/' + counter;
String znode = saveAsyncId(cloudManager.getDistribStateManager(), event, asyncId);
- log.debug("Saved requestId: {} in znode: {}", asyncId, znode);
+ log.trace("Saved requestId: {} in znode: {}", asyncId, znode);
// TODO: find a better way of using async calls using dataProvider API !!!
req.setAsyncId(asyncId);
SolrResponse asyncResponse = cloudManager.request(req);
@@ -132,7 +132,7 @@ public class ExecutePlanAction extends TriggerActionBase {
statusResponse = (CollectionAdminRequest.RequestStatusResponse)cloudManager.request(CollectionAdminRequest.requestStatus(requestId));
state = statusResponse.getRequestStatus();
if (state == RequestStatusState.COMPLETED || state == RequestStatusState.FAILED) {
- log.debug("Task with requestId={} finished with state={} in {}s", requestId, state, i * 5);
+ log.trace("Task with requestId={} finished with state={} in {}s", requestId, state, i * 5);
cloudManager.request(CollectionAdminRequest.deleteAsyncId(requestId));
return statusResponse;
} else if (state == RequestStatusState.NOT_FOUND) {
@@ -156,7 +156,7 @@ public class ExecutePlanAction extends TriggerActionBase {
throw e;
}
if (i > 0 && i % 5 == 0) {
- log.debug("Task with requestId={} still not complete after {}s. Last state={}", requestId, i * 5, state);
+ log.trace("Task with requestId={} still not complete after {}s. Last state={}", requestId, i * 5, state);
}
cloudManager.getTimeSource().sleep(5000);
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0290c95c/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
index 57c76c0..aaba1bd 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
@@ -163,7 +163,7 @@ public class NodeLostTrigger extends TriggerBase {
removeMarker(n);
});
} else {
- log.debug("NodeLostTrigger listener for lost nodes: {} is not ready, will try later", nodeNames);
+ log.debug("NodeLostTrigger processor for lost nodes: {} is not ready, will try later", nodeNames);
}
} else {
nodeNames.forEach(n -> {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0290c95c/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/LiveNodesSet.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/LiveNodesSet.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/LiveNodesSet.java
index 45cd66b..ca4ed71 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/LiveNodesSet.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/LiveNodesSet.java
@@ -37,6 +37,10 @@ public class LiveNodesSet {
return Collections.unmodifiableSet(set);
}
+ public int size() {
+ return set.size();
+ }
+
public void registerLiveNodesListener(LiveNodesListener listener) {
listeners.add(listener);
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0290c95c/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimCloudManager.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimCloudManager.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimCloudManager.java
index bde4b41..cd9f177 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimCloudManager.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimCloudManager.java
@@ -27,7 +27,7 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
@@ -96,10 +96,10 @@ public class SimCloudManager implements SolrCloudManager {
private TimeSource timeSource;
private final List<SolrInputDocument> systemColl = Collections.synchronizedList(new ArrayList<>());
- private final ExecutorService simCloudManagerPool;
- private final Map<String, AtomicLong> opCounts = new ConcurrentHashMap<>();
+ private final Map<String, AtomicLong> opCounts = new ConcurrentSkipListMap<>();
+ private ExecutorService simCloudManagerPool;
private Overseer.OverseerThread triggerThread;
private ThreadGroup triggerThreadGroup;
private SolrResourceLoader loader;
@@ -327,7 +327,8 @@ public class SimCloudManager implements SolrCloudManager {
/**
* Simulate the effect of restarting Overseer leader - in this case this means restarting the
- * OverseerTriggerThread and optionally killing a node.
+ * OverseerTriggerThread and optionally killing a node. All background tasks currently in progress
+ * will be interrupted.
* @param killNodeId optional nodeId to kill. If null then don't kill any node, just restart the thread
*/
public void simRestartOverseer(String killNodeId) throws Exception {
@@ -335,9 +336,17 @@ public class SimCloudManager implements SolrCloudManager {
triggerThread.interrupt();
IOUtils.closeQuietly(triggerThread);
if (killNodeId != null) {
- simRemoveNode(killNodeId, true);
+ simRemoveNode(killNodeId, false);
}
objectCache.clear();
+
+ try {
+ simCloudManagerPool.shutdownNow();
+ } catch (Exception e) {
+ // ignore
+ }
+ simCloudManagerPool = ExecutorUtil.newMDCAwareFixedThreadPool(200, new DefaultSolrThreadFactory("simCloudManagerPool"));
+
OverseerTriggerThread trigger = new OverseerTriggerThread(loader, this,
new CloudConfig.CloudConfigBuilder("nonexistent", 0, "sim").build());
triggerThread = new Overseer.OverseerThread(triggerThreadGroup, trigger, "Simulated OverseerAutoScalingTriggerThread");
@@ -378,6 +387,10 @@ public class SimCloudManager implements SolrCloudManager {
return opCounts;
}
+ public void simResetOpCounts() {
+ opCounts.clear();
+ }
+
/**
* Get the number of processed operations of a specified type.
* @param op operation name, eg. MOVEREPLICA
@@ -497,7 +510,7 @@ public class SimCloudManager implements SolrCloudManager {
if (action == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unknown action: " + a);
}
- LOG.debug("Invoking Collection Action :{} with params {}", action.toLower(), req.getParams().toQueryString());
+ LOG.trace("Invoking Collection Action :{} with params {}", action.toLower(), req.getParams().toQueryString());
NamedList results = new NamedList();
rsp.setResponse(results);
incrementCount(action.name());
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0290c95c/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java
index 1986bac..22f9fb9 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java
@@ -30,7 +30,6 @@ import java.util.Locale;
import java.util.Map;
import java.util.Random;
import java.util.Set;
-import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -72,12 +71,17 @@ import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.Utils;
import org.apache.zookeeper.CreateMode;
+import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.NRT_REPLICAS;
+import static org.apache.solr.common.cloud.ZkStateReader.PULL_REPLICAS;
+import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
+import static org.apache.solr.common.cloud.ZkStateReader.TLOG_REPLICAS;
import static org.apache.solr.common.params.CommonParams.NAME;
/**
@@ -108,7 +112,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
private final ReentrantLock lock = new ReentrantLock();
- private final ActionThrottle leaderThrottle;
+ private final Map<String, Map<String, ActionThrottle>> leaderThrottles = new ConcurrentHashMap<>();
// default map of: operation -> delay
private final Map<String, Long> defaultOpDelays = new HashMap<>();
@@ -116,7 +120,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
private final Map<String, Map<String, Long>> opDelays = new ConcurrentHashMap<>();
- private volatile int clusterStateVersion = -1;
+ private volatile int clusterStateVersion = 0;
private Map<String, Object> lastSavedProperties = null;
private AtomicReference<Map<String, DocCollection>> collectionsStatesRef = new AtomicReference<>();
@@ -133,7 +137,6 @@ public class SimClusterStateProvider implements ClusterStateProvider {
}
this.cloudManager = cloudManager;
this.stateManager = cloudManager.getSimDistribStateManager();
- this.leaderThrottle = new ActionThrottle("leader", 5000, cloudManager.getTimeSource());
// names are CollectionAction operation names, delays are in ms (simulated time)
defaultOpDelays.put(CollectionParams.CollectionAction.MOVEREPLICA.name(), 5000L);
defaultOpDelays.put(CollectionParams.CollectionAction.DELETEREPLICA.name(), 5000L);
@@ -191,10 +194,15 @@ public class SimClusterStateProvider implements ClusterStateProvider {
}
/**
- * Reset the leader election throttle.
+ * Reset the leader election throttles.
*/
- public void simResetLeaderThrottle() {
- leaderThrottle.reset();
+ public void simResetLeaderThrottles() {
+ leaderThrottles.clear();
+ }
+
+ private ActionThrottle getThrottle(String collection, String shard) {
+ return leaderThrottles.computeIfAbsent(collection, coll -> new ConcurrentHashMap<>())
+ .computeIfAbsent(shard, s -> new ActionThrottle("leader", 5000, cloudManager.getTimeSource()));
}
/**
@@ -225,30 +233,6 @@ public class SimClusterStateProvider implements ClusterStateProvider {
nodeReplicaMap.putIfAbsent(nodeId, new ArrayList<>());
}
- // utility class to run leader election in a separate thread and with throttling
- // Note: leader election is a no-op if a shard leader already exists for each shard
- private class LeaderElection implements Callable<Boolean> {
- Collection<String> collections;
- boolean saveClusterState;
-
- LeaderElection(Collection<String> collections, boolean saveClusterState) {
- this.collections = collections;
- this.saveClusterState = saveClusterState;
- }
-
- @Override
- public Boolean call() {
- leaderThrottle.minimumWaitBetweenActions();
- leaderThrottle.markAttemptingAction();
- try {
- simRunLeaderElection(collections, saveClusterState);
- } catch (Exception e) {
- return false;
- }
- return true;
- }
- }
-
/**
* Remove node from a cluster. This is equivalent to a situation when a node is lost.
* All replicas that were assigned to this node are marked as DOWN.
@@ -273,7 +257,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
stateManager.makePath(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH + "/" + nodeId);
}
if (!collections.isEmpty()) {
- cloudManager.submit(new LeaderElection(collections, true));
+ simRunLeaderElection(collections, true);
}
return res;
} finally {
@@ -326,7 +310,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
}
if (!collections.isEmpty()) {
collectionsStatesRef.set(null);
- cloudManager.submit(new LeaderElection(collections, true));
+ simRunLeaderElection(collections, true);
return true;
} else {
return false;
@@ -431,7 +415,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
}
cloudManager.getSimNodeStateProvider().simSetNodeValue(nodeId, ImplicitSnitch.DISK, disk - 10);
if (runLeaderElection) {
- cloudManager.submit(new LeaderElection(Collections.singleton(replicaInfo.getCollection()), true));
+ simRunLeaderElection(Collections.singleton(replicaInfo.getCollection()), true);
}
} finally {
lock.unlock();
@@ -468,7 +452,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
cloudManager.getSimNodeStateProvider().simSetNodeValue(nodeId, ImplicitSnitch.DISK, disk + 10);
}
LOG.trace("-- simRemoveReplica {}", ri);
- cloudManager.submit(new LeaderElection(Collections.singleton(ri.getCollection()), true));
+ simRunLeaderElection(Collections.singleton(ri.getCollection()), true);
return;
}
}
@@ -482,14 +466,14 @@ public class SimClusterStateProvider implements ClusterStateProvider {
* Save clusterstate.json to {@link DistribStateManager}.
* @return saved state
*/
- private ClusterState saveClusterState(ClusterState state) throws IOException {
+ private synchronized ClusterState saveClusterState(ClusterState state) throws IOException {
byte[] data = Utils.toJSON(state);
try {
VersionedData oldData = stateManager.getData(ZkStateReader.CLUSTER_STATE);
int version = oldData != null ? oldData.getVersion() : -1;
+ Assert.assertEquals(clusterStateVersion, version + 1);
stateManager.setData(ZkStateReader.CLUSTER_STATE, data, version);
- LOG.trace("-- saved cluster state version=" + clusterStateVersion +
- ", zkVersion=" + (version + 1) + ", {}", state);
+ clusterStateVersion++;
} catch (Exception e) {
throw new IOException(e);
}
@@ -515,75 +499,89 @@ public class SimClusterStateProvider implements ClusterStateProvider {
* @param collections list of affected collections
* @param saveClusterState if true then save cluster state regardless of changes.
*/
- private synchronized void simRunLeaderElection(Collection<String> collections, boolean saveClusterState) throws Exception {
+ private void simRunLeaderElection(Collection<String> collections, boolean saveClusterState) throws Exception {
ClusterState state = getClusterState();
- AtomicBoolean stateChanged = new AtomicBoolean(Boolean.FALSE);
-
+ if (saveClusterState) {
+ collectionsStatesRef.set(null);
+ }
state.forEachCollection(dc -> {
if (!collections.contains(dc.getName())) {
return;
}
- dc.getSlices().forEach(s -> {
- Replica leader = s.getLeader();
- if (leader == null || !liveNodes.contains(leader.getNodeName())) {
- LOG.trace("Running leader election for " + dc.getName() + " / " + s.getName());
- if (s.getReplicas().isEmpty()) { // no replicas - punt
- return;
+ dc.getSlices().forEach(s ->
+ cloudManager.submit(() -> {
+ simRunLeaderElection(dc.getName(), s, saveClusterState);
+ return true;
+ })
+ );
+ });
+ }
+
+ private void simRunLeaderElection(String collection, Slice s, boolean saveClusterState) throws Exception {
+ AtomicBoolean stateChanged = new AtomicBoolean(Boolean.FALSE);
+ Replica leader = s.getLeader();
+ if (leader == null || !liveNodes.contains(leader.getNodeName())) {
+ LOG.trace("Running leader election for " + collection + " / " + s.getName());
+ if (s.getReplicas().isEmpty()) { // no replicas - punt
+ return;
+ }
+ ActionThrottle lt = getThrottle(collection, s.getName());
+ lt.minimumWaitBetweenActions();
+ lt.markAttemptingAction();
+
+ // mark all replicas as non-leader (probably not necessary) and collect all active and live
+ List<ReplicaInfo> active = new ArrayList<>();
+ s.getReplicas().forEach(r -> {
+ AtomicReference<ReplicaInfo> riRef = new AtomicReference<>();
+ // find our ReplicaInfo for this replica
+ nodeReplicaMap.get(r.getNodeName()).forEach(info -> {
+ if (info.getName().equals(r.getName())) {
+ riRef.set(info);
}
- // mark all replicas as non-leader (probably not necessary) and collect all active and live
- List<ReplicaInfo> active = new ArrayList<>();
- s.getReplicas().forEach(r -> {
- AtomicReference<ReplicaInfo> riRef = new AtomicReference<>();
- // find our ReplicaInfo for this replica
- nodeReplicaMap.get(r.getNodeName()).forEach(info -> {
- if (info.getName().equals(r.getName())) {
- riRef.set(info);
- }
- });
- ReplicaInfo ri = riRef.get();
- if (ri == null) {
- throw new IllegalStateException("-- could not find ReplicaInfo for replica " + r);
- }
- synchronized (ri) {
- if (ri.getVariables().remove(ZkStateReader.LEADER_PROP) != null) {
- stateChanged.set(true);
- }
- if (r.isActive(liveNodes.get())) {
- active.add(ri);
- } else { // if it's on a node that is not live mark it down
- if (!liveNodes.contains(r.getNodeName())) {
- ri.getVariables().put(ZkStateReader.STATE_PROP, Replica.State.DOWN.toString());
- }
- }
- }
- });
- if (active.isEmpty()) {
- LOG.warn("-- can't find any active replicas for " + dc.getName() + " / " + s.getName());
- return;
+ });
+ ReplicaInfo ri = riRef.get();
+ if (ri == null) {
+ throw new IllegalStateException("-- could not find ReplicaInfo for replica " + r);
+ }
+ synchronized (ri) {
+ if (ri.getVariables().remove(ZkStateReader.LEADER_PROP) != null) {
+ stateChanged.set(true);
}
- // pick first active one
- ReplicaInfo ri = null;
- for (ReplicaInfo a : active) {
- if (!a.getType().equals(Replica.Type.PULL)) {
- ri = a;
- break;
+ if (r.isActive(liveNodes.get())) {
+ active.add(ri);
+ } else { // if it's on a node that is not live mark it down
+ if (!liveNodes.contains(r.getNodeName())) {
+ ri.getVariables().put(ZkStateReader.STATE_PROP, Replica.State.DOWN.toString());
+ stateChanged.set(true);
}
}
- if (ri == null) {
- LOG.warn("-- can't find any suitable replica type for " + dc.getName() + " / " + s.getName());
- return;
- }
- synchronized (ri) {
- ri.getVariables().put(ZkStateReader.LEADER_PROP, "true");
- }
- stateChanged.set(true);
- LOG.debug("-- elected new leader for " + dc.getName() + " / " + s.getName() + ": " + ri);
- } else {
- LOG.trace("-- already has leader for {} / {}", dc.getName(), s.getName());
}
});
- });
- if (saveClusterState || stateChanged.get()) {
+ if (active.isEmpty()) {
+ LOG.warn("-- can't find any active replicas for " + collection + " / " + s.getName());
+ return;
+ }
+ // pick first active one
+ ReplicaInfo ri = null;
+ for (ReplicaInfo a : active) {
+ if (!a.getType().equals(Replica.Type.PULL)) {
+ ri = a;
+ break;
+ }
+ }
+ if (ri == null) {
+ LOG.warn("-- can't find any suitable replica type for " + collection + " / " + s.getName());
+ return;
+ }
+ synchronized (ri) {
+ ri.getVariables().put(ZkStateReader.LEADER_PROP, "true");
+ }
+ stateChanged.set(true);
+ LOG.debug("-- elected new leader for " + collection + " / " + s.getName() + ": " + ri);
+ } else {
+ LOG.trace("-- already has leader for {} / {}", collection, s.getName());
+ }
+ if (stateChanged.get()) {
collectionsStatesRef.set(null);
}
}
@@ -618,6 +616,12 @@ public class SimClusterStateProvider implements ClusterStateProvider {
if (sessionWrapper.get() != null) {
sessionWrapper.get().release();
}
+ // calculate expected number of positions
+ int numTlogReplicas = props.getInt(TLOG_REPLICAS, 0);
+ int numNrtReplicas = props.getInt(NRT_REPLICAS, props.getInt(REPLICATION_FACTOR, numTlogReplicas>0?0:1));
+ int numPullReplicas = props.getInt(PULL_REPLICAS, 0);
+ int totalReplicas = shardNames.size() * (numNrtReplicas + numPullReplicas + numTlogReplicas);
+ Assert.assertEquals("unexpected number of replica positions", totalReplicas, replicaPositions.size());
final CountDownLatch finalStateLatch = new CountDownLatch(replicaPositions.size());
AtomicInteger replicaNum = new AtomicInteger(1);
replicaPositions.forEach(pos -> {
@@ -652,7 +656,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
}
});
});
- cloudManager.submit(new LeaderElection(Collections.singleton(collectionName), true));
+ simRunLeaderElection(Collections.singleton(collectionName), true);
if (waitForFinalState) {
boolean finished = finalStateLatch.await(cloudManager.getTimeSource().convertDelay(TimeUnit.SECONDS, 60, TimeUnit.MILLISECONDS),
TimeUnit.MILLISECONDS);
@@ -678,6 +682,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
try {
collProperties.remove(collection);
sliceProperties.remove(collection);
+ leaderThrottles.remove(collection);
opDelay(collection, CollectionParams.CollectionAction.DELETE.name());
@@ -716,9 +721,8 @@ public class SimClusterStateProvider implements ClusterStateProvider {
nodeReplicaMap.clear();
collProperties.clear();
sliceProperties.clear();
- cloudManager.getSimNodeStateProvider().simGetAllNodeValues().forEach((n, values) -> {
- values.put("cores", 0);
- });
+ leaderThrottles.clear();
+ cloudManager.getSimNodeStateProvider().simGetAllNodeValues().forEach((n, values) -> values.put("cores", 0));
collectionsStatesRef.set(null);
} finally {
lock.unlock();
@@ -831,7 +835,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
});
Map<String, Object> colProps = collProperties.computeIfAbsent(collectionName, c -> new ConcurrentHashMap<>());
- cloudManager.submit(new LeaderElection(Collections.singleton(collectionName), true));
+ simRunLeaderElection(Collections.singleton(collectionName), true);
results.add("success", "");
} finally {
lock.unlock();
@@ -899,7 +903,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
solrCoreName, collectionName, replicaPosition.shard, replicaPosition.type, subShardNodeName, replicaProps);
simAddReplica(replicaPosition.node, ri, false);
}
- cloudManager.submit(new LeaderElection(Collections.singleton(collectionName), true));
+ simRunLeaderElection(Collections.singleton(collectionName), true);
results.add("success", "");
}
@@ -1201,7 +1205,6 @@ public class SimClusterStateProvider implements ClusterStateProvider {
}
lock.lock();
collectionsStatesRef.set(null);
- clusterStateVersion++;
saveClusterState.set(true);
try {
Map<String, Map<String, Map<String, Replica>>> collMap = new HashMap<>();
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0290c95c/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimNodeStateProvider.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimNodeStateProvider.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimNodeStateProvider.java
index a96a1d5..8310836 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimNodeStateProvider.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimNodeStateProvider.java
@@ -26,7 +26,9 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.solr.client.solrj.cloud.autoscaling.NodeStateProvider;
@@ -150,6 +152,35 @@ public class SimNodeStateProvider implements NodeStateProvider {
}
/**
+ * Remove values that correspond to dead nodes. If values contained a 'nodeRole'
+ * key then /roles.json is updated.
+ */
+ public void simRemoveDeadNodes() {
+ Set<String> myNodes = new HashSet<>(nodeValues.keySet());
+ myNodes.removeAll(liveNodesSet.get());
+ AtomicBoolean updateRoles = new AtomicBoolean(false);
+ myNodes.forEach(n -> {
+ LOG.debug("- removing dead node values: " + n);
+ Map<String, Object> vals = nodeValues.remove(n);
+ if (vals.containsKey("nodeRole")) {
+ updateRoles.set(true);
+ }
+ });
+ if (updateRoles.get()) {
+ saveRoles();
+ }
+ }
+
+ /**
+ * Return a set of nodes that are not live but their values are still present.
+ */
+ public Set<String> simGetDeadNodes() {
+ Set<String> myNodes = new TreeSet<>(nodeValues.keySet());
+ myNodes.removeAll(liveNodesSet.get());
+ return myNodes;
+ }
+
+ /**
* Get all node values.
*/
public Map<String, Map<String, Object>> simGetAllNodeValues() {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0290c95c/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimSolrCloudTestCase.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimSolrCloudTestCase.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimSolrCloudTestCase.java
index be7209b..77ddbc0 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimSolrCloudTestCase.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimSolrCloudTestCase.java
@@ -20,15 +20,22 @@ import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Locale;
+import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
+import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
+import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.CollectionStatePredicate;
import org.apache.solr.common.cloud.DocCollection;
@@ -63,9 +70,11 @@ public class SimSolrCloudTestCase extends SolrTestCaseJ4 {
/** The cluster. */
protected static SimCloudManager cluster;
+ protected static int clusterNodeCount = 0;
protected static void configureCluster(int nodeCount, TimeSource timeSource) throws Exception {
cluster = SimCloudManager.createCluster(nodeCount, timeSource);
+ clusterNodeCount = nodeCount;
}
@AfterClass
@@ -77,11 +86,90 @@ public class SimSolrCloudTestCase extends SolrTestCaseJ4 {
}
@Override
+ public void tearDown() throws Exception {
+ super.tearDown();
+ if (cluster != null) {
+ log.info("\n");
+ log.info("#############################################");
+ log.info("############ FINAL CLUSTER STATS ############");
+ log.info("#############################################\n");
+ log.info("## Live nodes:\t\t" + cluster.getLiveNodesSet().size());
+ int emptyNodes = 0;
+ int maxReplicas = 0;
+ int minReplicas = Integer.MAX_VALUE;
+ Map<String, Map<Replica.State, AtomicInteger>> replicaStates = new TreeMap<>();
+ int numReplicas = 0;
+ for (String node : cluster.getLiveNodesSet().get()) {
+ List<ReplicaInfo> replicas = cluster.getSimClusterStateProvider().simGetReplicaInfos(node);
+ numReplicas += replicas.size();
+ if (replicas.size() > maxReplicas) {
+ maxReplicas = replicas.size();
+ }
+ if (minReplicas > replicas.size()) {
+ minReplicas = replicas.size();
+ }
+ for (ReplicaInfo ri : replicas) {
+ replicaStates.computeIfAbsent(ri.getCollection(), c -> new TreeMap<>())
+ .computeIfAbsent(ri.getState(), s -> new AtomicInteger())
+ .incrementAndGet();
+ }
+ if (replicas.isEmpty()) {
+ emptyNodes++;
+ }
+ }
+ if (minReplicas == Integer.MAX_VALUE) {
+ minReplicas = 0;
+ }
+ log.info("## Empty nodes:\t" + emptyNodes);
+ Set<String> deadNodes = cluster.getSimNodeStateProvider().simGetDeadNodes();
+ log.info("## Dead nodes:\t\t" + deadNodes.size());
+ deadNodes.forEach(n -> log.info("##\t\t" + n));
+ log.info("## Collections:\t" + cluster.getSimClusterStateProvider().simListCollections());
+ log.info("## Max replicas per node:\t" + maxReplicas);
+ log.info("## Min replicas per node:\t" + minReplicas);
+ log.info("## Total replicas:\t\t" + numReplicas);
+ replicaStates.forEach((c, map) -> {
+ AtomicInteger repCnt = new AtomicInteger();
+ map.forEach((s, cnt) -> repCnt.addAndGet(cnt.get()));
+ log.info("## * " + c + "\t\t" + repCnt.get());
+ map.forEach((s, cnt) -> log.info("##\t\t- " + String.format(Locale.ROOT, "%-12s %4d", s, cnt.get())));
+ });
+ log.info("######### Final Solr op counts ##########");
+ cluster.simGetOpCounts().forEach((k, cnt) -> log.info("##\t\t- " + String.format(Locale.ROOT, "%-14s %4d", k, cnt.get())));
+ log.info("######### Autoscaling event counts ###########");
+ TreeMap<String, Map<String, AtomicInteger>> counts = new TreeMap<>();
+ for (SolrInputDocument d : cluster.simGetSystemCollection()) {
+ if (!"autoscaling_event".equals(d.getFieldValue("type"))) {
+ continue;
+ }
+ counts.computeIfAbsent((String)d.getFieldValue("event.source_s"), s -> new TreeMap<>())
+ .computeIfAbsent((String)d.getFieldValue("stage_s"), s -> new AtomicInteger())
+ .incrementAndGet();
+ }
+ counts.forEach((trigger, map) -> {
+ log.info("## * Trigger: " + trigger);
+ map.forEach((s, cnt) -> log.info("##\t\t- " + String.format(Locale.ROOT, "%-11s %4d", s, cnt.get())));
+ });
+ }
+ }
+
+ @Override
public void setUp() throws Exception {
super.setUp();
if (cluster != null) {
- // clear any persisted auto scaling configuration
+ // clear any persisted configuration
cluster.getDistribStateManager().setData(SOLR_AUTOSCALING_CONF_PATH, Utils.toJSON(new ZkNodeProps()), -1);
+ cluster.getDistribStateManager().setData(ZkStateReader.ROLES, Utils.toJSON(new HashMap<>()), -1);
+ // restore the expected number of nodes
+ int currentSize = cluster.getLiveNodesSet().size();
+ if (currentSize < clusterNodeCount) {
+ int addCnt = clusterNodeCount - currentSize;
+ while (addCnt-- > 0) {
+ cluster.simAddNode();
+ }
+ } else if (currentSize > clusterNodeCount) {
+ cluster.simRemoveRandomNodes(currentSize - clusterNodeCount, true, random());
+ }
// clean any persisted trigger state or events
removeChildren(ZkStateReader.SOLR_AUTOSCALING_EVENTS_PATH);
removeChildren(ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH);
@@ -89,8 +177,12 @@ public class SimSolrCloudTestCase extends SolrTestCaseJ4 {
removeChildren(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH);
cluster.getSimClusterStateProvider().simDeleteAllCollections();
cluster.simClearSystemCollection();
- cluster.getSimClusterStateProvider().simResetLeaderThrottle();
+ // clear any dead nodes
+ cluster.getSimNodeStateProvider().simRemoveDeadNodes();
+ cluster.getSimClusterStateProvider().simResetLeaderThrottles();
cluster.simRestartOverseer(null);
+ cluster.getTimeSource().sleep(5000);
+ cluster.simResetOpCounts();
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0290c95c/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestComputePlanAction.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestComputePlanAction.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestComputePlanAction.java
index b7053d7d..306d35f 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestComputePlanAction.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestComputePlanAction.java
@@ -81,15 +81,6 @@ public class TestComputePlanAction extends SimSolrCloudTestCase {
triggerFiredLatch = new CountDownLatch(1);
actionContextPropsRef.set(null);
- if (cluster.getClusterStateProvider().getLiveNodes().size() > NODE_COUNT) {
- // stop some to get to original state
- int numJetties = cluster.getClusterStateProvider().getLiveNodes().size();
- for (int i = 0; i < numJetties - NODE_COUNT; i++) {
- String node = cluster.getSimClusterStateProvider().simGetRandomNode(random());
- cluster.getSimClusterStateProvider().simRemoveNode(node);
- }
- }
-
String setClusterPolicyCommand = "{" +
" 'set-cluster-policy': [" +
" {'cores':'<10', 'node':'#ANY'}," +
@@ -202,7 +193,7 @@ public class TestComputePlanAction extends SimSolrCloudTestCase {
"'waitFor' : '1s'," +
"'enabled' : true," +
"'actions' : [{'name':'compute_plan', 'class' : 'solr.ComputePlanAction'}," +
- "{'name':'test','class':'" + TestComputePlanAction.AssertingTriggerAction.class.getName() + "'}]" +
+ "{'name':'test','class':'" + AssertingTriggerAction.class.getName() + "'}]" +
"}}";
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
NamedList<Object> response = solrClient.request(req);
@@ -245,7 +236,7 @@ public class TestComputePlanAction extends SimSolrCloudTestCase {
Map context = actionContextPropsRef.get();
assertNotNull(context);
List<SolrRequest> operations = (List<SolrRequest>) context.get("operations");
- assertNotNull("The operations computed by ComputePlanAction should not be null " + actionContextPropsRef.get(), operations);
+ assertNotNull("The operations computed by ComputePlanAction should not be null " + actionContextPropsRef.get() + "\nevent: " + eventRef.get(), operations);
operations.forEach(solrRequest -> log.info(solrRequest.getParams().toString()));
assertEquals("ComputePlanAction should have computed exactly 2 operation", 2, operations.size());
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0290c95c/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestExecutePlanAction.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestExecutePlanAction.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestExecutePlanAction.java
index 18d76dc..bb03fc5 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestExecutePlanAction.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestExecutePlanAction.java
@@ -46,7 +46,6 @@ import org.apache.solr.common.util.Utils;
import org.apache.solr.util.LogLevel;
import org.apache.solr.common.util.TimeSource;
import org.junit.After;
-import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
@@ -66,19 +65,6 @@ public class TestExecutePlanAction extends SimSolrCloudTestCase {
configureCluster(NODE_COUNT, TimeSource.get("simTime:50"));
}
- @Before
- public void setUp() throws Exception {
- super.setUp();
-
- if (cluster.getClusterStateProvider().getLiveNodes().size() < NODE_COUNT) {
- // start some to get to original state
- int numJetties = cluster.getClusterStateProvider().getLiveNodes().size();
- for (int i = 0; i < NODE_COUNT - numJetties; i++) {
- cluster.simAddNode();
- }
- }
- }
-
@After
public void printState() throws Exception {
log.info("-------------_ FINAL STATE --------------");
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0290c95c/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestLargeCluster.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestLargeCluster.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestLargeCluster.java
index 034a039..6758987 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestLargeCluster.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestLargeCluster.java
@@ -42,6 +42,7 @@ import org.apache.solr.cloud.autoscaling.TriggerActionBase;
import org.apache.solr.cloud.autoscaling.TriggerEvent;
import org.apache.solr.cloud.autoscaling.TriggerListenerBase;
import org.apache.solr.cloud.autoscaling.CapturedEvent;
+import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.util.NamedList;
@@ -79,15 +80,10 @@ public class TestLargeCluster extends SimSolrCloudTestCase {
@Before
public void setupTest() throws Exception {
- waitForSeconds = 1 + random().nextInt(3);
+ waitForSeconds = 5;
triggerFiredCount.set(0);
triggerFiredLatch = new CountDownLatch(1);
listenerEvents.clear();
- while (cluster.getClusterStateProvider().getLiveNodes().size() < NUM_NODES) {
- // perhaps a test stopped a node but didn't start it back
- // lets start a node
- cluster.simAddNode();
- }
}
public static class TestTriggerListener extends TriggerListenerBase {
@@ -163,6 +159,7 @@ public class TestLargeCluster extends SimSolrCloudTestCase {
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName,
"conf", 5, 5, 5, 5);
create.setMaxShardsPerNode(1);
+ create.setAutoAddReplicas(false);
create.setCreateNodeSet(String.join(",", nodes));
create.process(solrClient);
@@ -196,7 +193,6 @@ public class TestLargeCluster extends SimSolrCloudTestCase {
}
log.info("Ready after " + waitForState(collectionName, 30 * nodes.size(), TimeUnit.SECONDS, clusterShape(5, 15)) + "ms");
- log.info("OP COUNTS: " + cluster.simGetOpCounts());
long newMoveReplicaOps = cluster.simGetOpCount(CollectionParams.CollectionAction.MOVEREPLICA.name());
log.info("==== Flaky replicas: {}. Additional MOVEREPLICA count: {}", flakyReplicas, (newMoveReplicaOps - moveReplicaOps));
// flaky nodes lead to a number of MOVEREPLICA that is non-zero but lower than the number of flaky replicas
@@ -206,6 +202,208 @@ public class TestLargeCluster extends SimSolrCloudTestCase {
}
@Test
+ public void testAddNode() throws Exception {
+ SolrClient solrClient = cluster.simGetSolrClient();
+ String setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'node_added_trigger'," +
+ "'event' : 'nodeAdded'," +
+ "'waitFor' : '" + waitForSeconds + "s'," +
+ "'enabled' : true," +
+ "'actions' : [" +
+ "{'name':'compute','class':'" + ComputePlanAction.class.getName() + "'}," +
+ "{'name':'execute','class':'" + ExecutePlanAction.class.getName() + "'}" +
+ "]" +
+ "}}";
+ SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+ NamedList<Object> response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ // create a collection with more than 1 replica per node
+ String collectionName = "testNodeAdded";
+ CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName,
+ "conf", NUM_NODES / 10, NUM_NODES / 10, NUM_NODES / 10, NUM_NODES / 10);
+ create.setMaxShardsPerNode(5);
+ create.setAutoAddReplicas(false);
+ create.process(solrClient);
+
+ log.info("Ready after " + waitForState(collectionName, 20 * NUM_NODES, TimeUnit.SECONDS, clusterShape(NUM_NODES / 10, NUM_NODES / 10 * 3)) + " ms");
+
+ int numAddNode = NUM_NODES / 5;
+ List<String> addNodesList = new ArrayList<>(numAddNode);
+ for (int i = 0; i < numAddNode; i++) {
+ addNodesList.add(cluster.simAddNode());
+ cluster.getTimeSource().sleep(5000);
+ }
+ List<SolrInputDocument> systemColl = cluster.simGetSystemCollection();
+ int startedEventPos = -1;
+ for (int i = 0; i < systemColl.size(); i++) {
+ SolrInputDocument d = systemColl.get(i);
+ if (!"node_added_trigger".equals(d.getFieldValue("event.source_s"))) {
+ continue;
+ }
+ if ("NODEADDED".equals(d.getFieldValue("event.type_s")) &&
+ "STARTED".equals(d.getFieldValue("stage_s"))) {
+ startedEventPos = i;
+ break;
+ }
+ }
+ assertTrue("no STARTED event", startedEventPos > -1);
+ SolrInputDocument startedEvent = systemColl.get(startedEventPos);
+ int ignored = 0;
+ int lastIgnoredPos = startedEventPos;
+ for (int i = startedEventPos + 1; i < systemColl.size(); i++) {
+ SolrInputDocument d = systemColl.get(i);
+ if (!"node_added_trigger".equals(d.getFieldValue("event.source_s"))) {
+ continue;
+ }
+ if ("NODEADDED".equals(d.getFieldValue("event.type_s"))) {
+ if ("IGNORED".equals(d.getFieldValue("stage_s"))) {
+ ignored++;
+ lastIgnoredPos = i;
+ }
+ }
+ }
+ assertTrue("no IGNORED events", ignored > 0);
+ // make sure some replicas have been moved
+ assertTrue("no MOVEREPLICA ops?", cluster.simGetOpCount("MOVEREPLICA") > 0);
+
+ log.info("Ready after " + waitForState(collectionName, 20 * NUM_NODES, TimeUnit.SECONDS, clusterShape(NUM_NODES / 10, NUM_NODES / 10 * 3)) + " ms");
+
+ int count = 50;
+ SolrInputDocument finishedEvent = null;
+ long lastNumOps = cluster.simGetOpCount("MOVEREPLICA");
+ while (count-- > 0) {
+ cluster.getTimeSource().sleep(150000);
+ long currentNumOps = cluster.simGetOpCount("MOVEREPLICA");
+ if (currentNumOps == lastNumOps) {
+ int size = systemColl.size() - 1;
+ for (int i = size; i > lastIgnoredPos; i--) {
+ SolrInputDocument d = systemColl.get(i);
+ if (!"node_added_trigger".equals(d.getFieldValue("event.source_s"))) {
+ continue;
+ }
+ if ("SUCCEEDED".equals(d.getFieldValue("stage_s"))) {
+ finishedEvent = d;
+ break;
+ }
+ }
+ break;
+ } else {
+ lastNumOps = currentNumOps;
+ }
+ }
+
+ assertTrue("did not finish processing changes", finishedEvent != null);
+ long delta = (Long)finishedEvent.getFieldValue("event.time_l") - (Long)startedEvent.getFieldValue("event.time_l");
+ log.info("#### System stabilized after " + TimeUnit.NANOSECONDS.toMillis(delta) + " ms");
+ assertTrue("unexpected number of MOVEREPLICA ops", cluster.simGetOpCount("MOVEREPLICA") > 1);
+ }
+
+ @Test
+ public void testNodeLost() throws Exception {
+ SolrClient solrClient = cluster.simGetSolrClient();
+ String setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'node_lost_trigger'," +
+ "'event' : 'nodeLost'," +
+ "'waitFor' : '" + waitForSeconds + "s'," +
+ "'enabled' : true," +
+ "'actions' : [" +
+ "{'name':'compute','class':'" + ComputePlanAction.class.getName() + "'}," +
+ "{'name':'execute','class':'" + ExecutePlanAction.class.getName() + "'}" +
+ "]" +
+ "}}";
+ SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+ NamedList<Object> response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ // create a collection with 1 replica per node
+ String collectionName = "testNodeLost";
+ CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName,
+ "conf", NUM_NODES / 5, NUM_NODES / 10);
+ create.setMaxShardsPerNode(5);
+ create.setAutoAddReplicas(false);
+ create.process(solrClient);
+
+ log.info("Ready after " + waitForState(collectionName, 20 * NUM_NODES, TimeUnit.SECONDS, clusterShape(NUM_NODES / 5, NUM_NODES / 10)) + " ms");
+
+ // start killing nodes
+ int numNodes = NUM_NODES / 5;
+ List<String> nodes = new ArrayList<>(cluster.getLiveNodesSet().get());
+ for (int i = 0; i < numNodes; i++) {
+ // this may also select a node where a replica is moved to, so the total number of
+ // MOVEREPLICA may vary
+ cluster.simRemoveNode(nodes.get(i), false);
+ cluster.getTimeSource().sleep(4000);
+ }
+ List<SolrInputDocument> systemColl = cluster.simGetSystemCollection();
+ int startedEventPos = -1;
+ for (int i = 0; i < systemColl.size(); i++) {
+ SolrInputDocument d = systemColl.get(i);
+ if (!"node_lost_trigger".equals(d.getFieldValue("event.source_s"))) {
+ continue;
+ }
+ if ("NODELOST".equals(d.getFieldValue("event.type_s")) &&
+ "STARTED".equals(d.getFieldValue("stage_s"))) {
+ startedEventPos = i;
+ break;
+ }
+ }
+ assertTrue("no STARTED event: " + systemColl, startedEventPos > -1);
+ SolrInputDocument startedEvent = systemColl.get(startedEventPos);
+ int ignored = 0;
+ int lastIgnoredPos = startedEventPos;
+ for (int i = startedEventPos + 1; i < systemColl.size(); i++) {
+ SolrInputDocument d = systemColl.get(i);
+ if (!"node_lost_trigger".equals(d.getFieldValue("event.source_s"))) {
+ continue;
+ }
+ if ("NODELOST".equals(d.getFieldValue("event.type_s"))) {
+ if ("IGNORED".equals(d.getFieldValue("stage_s"))) {
+ ignored++;
+ lastIgnoredPos = i;
+ }
+ }
+ }
+ assertTrue("no IGNORED events", ignored > 0);
+ // make sure some replicas have been moved
+ assertTrue("no MOVEREPLICA ops?", cluster.simGetOpCount("MOVEREPLICA") > 0);
+
+ log.info("Ready after " + waitForState(collectionName, 20 * NUM_NODES, TimeUnit.SECONDS, clusterShape(NUM_NODES / 5, NUM_NODES / 10)) + " ms");
+
+ int count = 50;
+ SolrInputDocument finishedEvent = null;
+ long lastNumOps = cluster.simGetOpCount("MOVEREPLICA");
+ while (count-- > 0) {
+ cluster.getTimeSource().sleep(150000);
+ long currentNumOps = cluster.simGetOpCount("MOVEREPLICA");
+ if (currentNumOps == lastNumOps) {
+ int size = systemColl.size() - 1;
+ for (int i = size; i > lastIgnoredPos; i--) {
+ SolrInputDocument d = systemColl.get(i);
+ if (!"node_lost_trigger".equals(d.getFieldValue("event.source_s"))) {
+ continue;
+ }
+ if ("SUCCEEDED".equals(d.getFieldValue("stage_s"))) {
+ finishedEvent = d;
+ break;
+ }
+ }
+ break;
+ } else {
+ lastNumOps = currentNumOps;
+ }
+ }
+
+ assertTrue("did not finish processing changes", finishedEvent != null);
+ long delta = (Long)finishedEvent.getFieldValue("event.time_l") - (Long)startedEvent.getFieldValue("event.time_l");
+ log.info("#### System stabilized after " + TimeUnit.NANOSECONDS.toMillis(delta) + " ms");
+ long ops = cluster.simGetOpCount("MOVEREPLICA");
+ assertTrue("unexpected number of MOVEREPLICA ops: " + ops, ops >= 40);
+ }
+
+ @Test
@AwaitsFix(bugUrl = "https://issues.apache.org/jira/browse/SOLR-11714")
public void testSearchRate() throws Exception {
SolrClient solrClient = cluster.simGetSolrClient();
@@ -255,7 +453,6 @@ public class TestLargeCluster extends SimSolrCloudTestCase {
// simulate search traffic
cluster.getSimClusterStateProvider().simSetShardValue(collectionName, "shard1", metricName, 40, true);
- Thread.sleep(1000000000);
// boolean await = triggerFiredLatch.await(20000 / SPEED, TimeUnit.MILLISECONDS);
// assertTrue("The trigger did not fire at all", await);
// wait for listener to capture the SUCCEEDED stage
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0290c95c/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestNodeAddedTrigger.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestNodeAddedTrigger.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestNodeAddedTrigger.java
index 9b4e2bc..f938d5c 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestNodeAddedTrigger.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestNodeAddedTrigger.java
@@ -137,7 +137,7 @@ public class TestNodeAddedTrigger extends SimSolrCloudTestCase {
return true;
});
trigger.run(); // first run should detect the new node
- cluster.simRemoveNode(newNode, true);
+ cluster.simRemoveNode(newNode, false);
int counter = 0;
do {
trigger.run();
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0290c95c/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestNodeLostTrigger.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestNodeLostTrigger.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestNodeLostTrigger.java
index 109cee3..1474c69 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestNodeLostTrigger.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestNodeLostTrigger.java
@@ -81,8 +81,8 @@ public class TestNodeLostTrigger extends SimSolrCloudTestCase {
Iterator<String> it = cluster.getLiveNodesSet().get().iterator();
String lostNodeName1 = it.next();
String lostNodeName2 = it.next();
- cluster.simRemoveNode(lostNodeName1, true);
- cluster.simRemoveNode(lostNodeName2, true);
+ cluster.simRemoveNode(lostNodeName1, false);
+ cluster.simRemoveNode(lostNodeName2, false);
timeSource.sleep(1000);
AtomicBoolean fired = new AtomicBoolean(false);
@@ -223,7 +223,7 @@ public class TestNodeLostTrigger extends SimSolrCloudTestCase {
trigger.run(); // starts tracking live nodes
// stop the newly created node
- cluster.simRemoveNode(newNode, true);
+ cluster.simRemoveNode(newNode, false);
AtomicInteger callCount = new AtomicInteger(0);
AtomicBoolean fired = new AtomicBoolean(false);
@@ -263,7 +263,7 @@ public class TestNodeLostTrigger extends SimSolrCloudTestCase {
trigger.run();
// stop the newly created node
- cluster.simRemoveNode(newNode, true);
+ cluster.simRemoveNode(newNode, false);
trigger.run(); // this run should detect the lost node
trigger.close(); // close the old trigger
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0290c95c/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java
index d091e34..b37387b 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java
@@ -47,9 +47,15 @@ import static org.apache.solr.common.util.Utils.time;
import static org.apache.solr.common.util.Utils.timeElapsed;
public class PolicyHelper {
- private static ThreadLocal<Map<String, String>> policyMapping = new ThreadLocal<>();
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private static final String POLICY_MAPPING_KEY = "PolicyHelper.policyMapping";
+
+ private static ThreadLocal<Map<String, String>> getPolicyMapping(SolrCloudManager cloudManager) {
+ return (ThreadLocal<Map<String, String>>)cloudManager.getObjectCache()
+ .computeIfAbsent(POLICY_MAPPING_KEY, k -> new ThreadLocal<>());
+ }
+
public static List<ReplicaPosition> getReplicaLocations(String collName, AutoScalingConfig autoScalingConfig,
SolrCloudManager cloudManager,
Map<String, String> optionalPolicyMapping,
@@ -59,6 +65,7 @@ public class PolicyHelper {
int pullReplicas,
List<String> nodesList) {
List<ReplicaPosition> positions = new ArrayList<>();
+ ThreadLocal<Map<String, String>> policyMapping = getPolicyMapping(cloudManager);
ClusterStateProvider stateProvider = new DelegatingClusterStateProvider(cloudManager.getClusterStateProvider()) {
@Override
public String getPolicyNameByCollection(String coll) {