You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2017/12/11 18:52:40 UTC
[21/21] lucene-solr:jira/solr-11285-sim: SOLR-11403: Keep track of
/live_nodes entries and ephemeral nodeAdded / nodeLost nodes. Port over
NodeAdded and NodeLost trigger tests.
SOLR-11403: Keep track of /live_nodes entries and ephemeral nodeAdded / nodeLost nodes.
Port over NodeAdded and NodeLost trigger tests.
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/9e1c2490
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/9e1c2490
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/9e1c2490
Branch: refs/heads/jira/solr-11285-sim
Commit: 9e1c2490f33b3d215fd90d2ee1c6262380180ac1
Parents: 403812c
Author: Andrzej Bialecki <ab...@apache.org>
Authored: Mon Dec 11 19:51:28 2017 +0100
Committer: Andrzej Bialecki <ab...@apache.org>
Committed: Mon Dec 11 19:51:28 2017 +0100
----------------------------------------------------------------------
.../cloud/autoscaling/ExecutePlanAction.java | 8 +-
.../cloud/autoscaling/sim/LiveNodesSet.java | 99 ++++++
.../cloud/autoscaling/sim/SimCloudManager.java | 98 +++---
.../sim/SimClusterStateProvider.java | 70 +++-
.../autoscaling/sim/SimDistribStateManager.java | 17 +
.../autoscaling/sim/SimNodeStateProvider.java | 8 +-
.../sim/TestClusterStateProvider.java | 25 +-
.../autoscaling/sim/TestNodeAddedTrigger.java | 306 +++++++++++++++++
.../autoscaling/sim/TestNodeLostTrigger.java | 334 +++++++++++++++++++
.../autoscaling/sim/TestTriggerIntegration.java | 146 ++++----
10 files changed, 949 insertions(+), 162 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9e1c2490/solr/core/src/java/org/apache/solr/cloud/autoscaling/ExecutePlanAction.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ExecutePlanAction.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ExecutePlanAction.java
index 841856e..1e1b0ac 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ExecutePlanAction.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ExecutePlanAction.java
@@ -108,15 +108,13 @@ public class ExecutePlanAction extends TriggerActionBase {
} catch (IOException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Unexpected exception executing operation: " + operation.getParams(), e);
-// } catch (InterruptedException e) {
-// Thread.currentThread().interrupt();
-// throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "ExecutePlanAction was interrupted", e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "ExecutePlanAction was interrupted", e);
} catch (Exception e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Unexpected exception executing operation: " + operation.getParams(), e);
}
-
-// counter++;
}
} catch (Exception e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9e1c2490/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/LiveNodesSet.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/LiveNodesSet.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/LiveNodesSet.java
new file mode 100644
index 0000000..45cd66b
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/LiveNodesSet.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud.autoscaling.sim;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.solr.common.cloud.LiveNodesListener;
+
+/**
+ * This class represents a set of live nodes and allows adding listeners to track their state.
+ */
+public class LiveNodesSet {
+
+ private final Set<String> set = ConcurrentHashMap.newKeySet();
+ private final Set<LiveNodesListener> listeners = ConcurrentHashMap.newKeySet();
+
+ public Set<String> get() {
+ return Collections.unmodifiableSet(set);
+ }
+
+ public void registerLiveNodesListener(LiveNodesListener listener) {
+ listeners.add(listener);
+ }
+
+ public void removeLiveNodesListener(LiveNodesListener listener) {
+ listeners.remove(listener);
+ }
+
+ private void fireListeners(SortedSet<String> oldNodes, SortedSet<String> newNodes) {
+ for (LiveNodesListener listener : listeners) {
+ listener.onChange(oldNodes, newNodes);
+ }
+ }
+
+ public boolean isEmpty() {
+ return set.isEmpty();
+ }
+
+ public boolean contains(String id) {
+ return set.contains(id);
+ }
+
+ public synchronized boolean add(String id) {
+ if (set.contains(id)) {
+ return false;
+ }
+ TreeSet<String> oldNodes = new TreeSet<>(set);
+ set.add(id);
+ TreeSet<String> newNodes = new TreeSet<>(set);
+ fireListeners(oldNodes, newNodes);
+ return true;
+ }
+
+ public synchronized boolean addAll(Collection<String> nodes) {
+ TreeSet<String> oldNodes = new TreeSet<>(set);
+ boolean changed = set.addAll(nodes);
+ TreeSet<String> newNodes = new TreeSet<>(set);
+ if (changed) {
+ fireListeners(oldNodes, newNodes);
+ }
+ return changed;
+ }
+
+ public synchronized boolean remove(String id) {
+ if (!set.contains(id)) {
+ return false;
+ }
+ TreeSet<String> oldNodes = new TreeSet<>(set);
+ set.remove(id);
+ TreeSet<String> newNodes = new TreeSet<>(set);
+ fireListeners(oldNodes, newNodes);
+ return true;
+ }
+
+ public synchronized void clear() {
+ TreeSet<String> oldNodes = new TreeSet<>(set);
+ set.clear();
+ fireListeners(oldNodes, Collections.emptySortedSet());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9e1c2490/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimCloudManager.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimCloudManager.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimCloudManager.java
index 9c8cc29..92840c8 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimCloudManager.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimCloudManager.java
@@ -26,7 +26,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
-import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
@@ -91,11 +90,10 @@ public class SimCloudManager implements SolrCloudManager {
private final SimClusterStateProvider clusterStateProvider;
private final SimNodeStateProvider nodeStateProvider;
private final AutoScalingHandler autoScalingHandler;
- private final Set<String> liveNodes = ConcurrentHashMap.newKeySet();
+ private final LiveNodesSet liveNodesSet = new LiveNodesSet();
private final DistributedQueueFactory queueFactory;
private final ObjectCache objectCache = new ObjectCache();
private TimeSource timeSource;
- private SolrClient solrClient;
private final List<SolrInputDocument> systemColl = Collections.synchronizedList(new ArrayList<>());
private final ExecutorService simCloudManagerPool;
@@ -103,11 +101,14 @@ public class SimCloudManager implements SolrCloudManager {
private Overseer.OverseerThread triggerThread;
+ private ThreadGroup triggerThreadGroup;
+ private SolrResourceLoader loader;
private static int nodeIdPort = 10000;
public SimCloudManager(TimeSource timeSource) throws Exception {
this.stateManager = new SimDistribStateManager();
+ this.loader = new SolrResourceLoader();
// init common paths
stateManager.makePath(ZkStateReader.CLUSTER_STATE);
stateManager.makePath(ZkStateReader.CLUSTER_PROPS);
@@ -120,22 +121,18 @@ public class SimCloudManager implements SolrCloudManager {
stateManager.makePath(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH);
this.timeSource = timeSource != null ? timeSource : TimeSource.NANO_TIME;
- this.clusterStateProvider = new SimClusterStateProvider(liveNodes, this);
- this.nodeStateProvider = new SimNodeStateProvider(liveNodes, this.stateManager, this.clusterStateProvider, null);
+ this.clusterStateProvider = new SimClusterStateProvider(liveNodesSet, this);
+ this.nodeStateProvider = new SimNodeStateProvider(liveNodesSet, this.stateManager, this.clusterStateProvider, null);
this.queueFactory = new GenericDistributedQueueFactory(stateManager);
this.simCloudManagerPool = ExecutorUtil.newMDCAwareFixedThreadPool(200, new DefaultSolrThreadFactory("simCloudManagerPool"));
- this.autoScalingHandler = new AutoScalingHandler(this, new SolrResourceLoader());
- ThreadGroup triggerThreadGroup = new ThreadGroup("Simulated Overseer autoscaling triggers");
- OverseerTriggerThread trigger = new OverseerTriggerThread(new SolrResourceLoader(), this,
+ this.autoScalingHandler = new AutoScalingHandler(this, loader);
+ triggerThreadGroup = new ThreadGroup("Simulated Overseer autoscaling triggers");
+ OverseerTriggerThread trigger = new OverseerTriggerThread(loader, this,
new CloudConfig.CloudConfigBuilder("nonexistent", 0, "sim").build());
triggerThread = new Overseer.OverseerThread(triggerThreadGroup, trigger, "Simulated OverseerAutoScalingTriggerThread");
triggerThread.start();
}
- public void setSolrClient(SolrClient solrClient) {
- this.solrClient = solrClient;
- }
-
// ---------- simulator setup methods -----------
public static SimCloudManager createCluster(int numNodes, TimeSource timeSource) throws Exception {
@@ -213,6 +210,10 @@ public class SimCloudManager implements SolrCloudManager {
return values;
}
+ public SolrResourceLoader getLoader() {
+ return loader;
+ }
+
/**
* Add a new node and initialize its node values (metrics).
* @return new node id
@@ -250,7 +251,7 @@ public class SimCloudManager implements SolrCloudManager {
* @param random random
*/
public void simRemoveRandomNodes(int number, boolean withValues, Random random) throws Exception {
- List<String> nodes = new ArrayList<>(liveNodes);
+ List<String> nodes = new ArrayList<>(liveNodesSet.get());
Collections.shuffle(nodes, random);
int count = Math.min(number, nodes.size());
for (int i = 0; i < count; i++) {
@@ -279,22 +280,37 @@ public class SimCloudManager implements SolrCloudManager {
* @return simulated SolrClient.
*/
public SolrClient simGetSolrClient() {
- if (solrClient != null) {
- return solrClient;
- } else {
- return new SolrClient() {
- @Override
- public NamedList<Object> request(SolrRequest request, String collection) throws SolrServerException, IOException {
- SolrResponse rsp = SimCloudManager.this.request(request);
- return rsp.getResponse();
- }
-
- @Override
- public void close() throws IOException {
-
- }
- };
+ return new SolrClient() {
+ @Override
+ public NamedList<Object> request(SolrRequest request, String collection) throws SolrServerException, IOException {
+ SolrResponse rsp = SimCloudManager.this.request(request);
+ return rsp.getResponse();
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+ };
+ }
+
+ /**
+ * Simulate the effect of restarting Overseer leader - in this case this means restarting the
+ * OverseerTriggerThread.
+ * @param killNodeId optional nodeId to kill. If null then don't kill any node, just restart the thread
+ */
+ public void simRestartOverseer(String killNodeId) throws Exception {
+ LOG.info("=== Restarting OverseerTriggerThread...");
+ IOUtils.closeQuietly(triggerThread);
+ triggerThread.interrupt();
+ if (killNodeId != null) {
+ simRemoveNode(killNodeId, true);
}
+ OverseerTriggerThread trigger = new OverseerTriggerThread(loader, this,
+ new CloudConfig.CloudConfigBuilder("nonexistent", 0, "sim").build());
+ triggerThread = new Overseer.OverseerThread(triggerThreadGroup, trigger, "Simulated OverseerAutoScalingTriggerThread");
+ triggerThread.start();
+
}
/**
@@ -319,6 +335,10 @@ public class SimCloudManager implements SolrCloudManager {
return stateManager;
}
+ public LiveNodesSet getLiveNodesSet() {
+ return liveNodesSet;
+ }
+
public Map<String, AtomicLong> simGetOpCounts() {
return opCounts;
}
@@ -363,19 +383,11 @@ public class SimCloudManager implements SolrCloudManager {
@Override
public SolrResponse request(SolrRequest req) throws IOException {
- if (solrClient != null) {
- try {
- return req.process(solrClient);
- } catch (SolrServerException e) {
- throw new IOException(e);
- }
- } else {
- try {
- Future<SolrResponse> res = submit(() -> simHandleSolrRequest(req));
- return res.get();
- } catch (Exception e) {
- throw new IOException(e);
- }
+ try {
+ Future<SolrResponse> res = submit(() -> simHandleSolrRequest(req));
+ return res.get();
+ } catch (Exception e) {
+ throw new IOException(e);
}
}
@@ -499,8 +511,8 @@ public class SimCloudManager implements SolrCloudManager {
if (req.getParams().get(CommonAdminParams.ASYNC) != null) {
results.add(REQUESTID, req.getParams().get(CommonAdminParams.ASYNC));
}
- if (!liveNodes.isEmpty()) {
- results.add("leader", liveNodes.iterator().next());
+ if (!liveNodesSet.get().isEmpty()) {
+ results.add("leader", liveNodesSet.get().iterator().next());
}
results.add("overseer_queue_size", 0);
results.add("overseer_work_queue_size", 0);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9e1c2490/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java
index d51c059..e63e8ba 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java
@@ -38,10 +38,13 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
+import org.apache.solr.client.solrj.cloud.autoscaling.AlreadyExistsException;
+import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
import org.apache.solr.client.solrj.cloud.autoscaling.DistribStateManager;
import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
import org.apache.solr.client.solrj.cloud.autoscaling.Suggestion;
+import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData;
import org.apache.solr.client.solrj.impl.ClusterStateProvider;
import org.apache.solr.cloud.ActionThrottle;
@@ -67,6 +70,7 @@ import org.apache.solr.common.params.CommonAdminParams;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.Utils;
+import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -82,8 +86,8 @@ public class SimClusterStateProvider implements ClusterStateProvider {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final Map<String, List<ReplicaInfo>> nodeReplicaMap = new ConcurrentHashMap<>();
- private final Set<String> liveNodes;
- private final DistribStateManager stateManager;
+ private final LiveNodesSet liveNodes;
+ private final SimDistribStateManager stateManager;
private final SimCloudManager cloudManager;
private final Map<String, Object> clusterProperties = new ConcurrentHashMap<>();
@@ -109,10 +113,13 @@ public class SimClusterStateProvider implements ClusterStateProvider {
* The instance needs to be initialized using the <code>sim*</code> methods in order
* to ensure proper behavior, otherwise it will behave as a cluster with zero replicas.
*/
- public SimClusterStateProvider(Set<String> liveNodes, SimCloudManager cloudManager) {
+ public SimClusterStateProvider(LiveNodesSet liveNodes, SimCloudManager cloudManager) throws Exception {
this.liveNodes = liveNodes;
+ for (String nodeId : liveNodes.get()) {
+ createEphemeralLiveNode(nodeId);
+ }
this.cloudManager = cloudManager;
- this.stateManager = cloudManager.getDistribStateManager();
+ this.stateManager = cloudManager.getSimDistribStateManager();
this.leaderThrottle = new ActionThrottle("leader", 5000, cloudManager.getTimeSource());
// names are CollectionAction names, delays are in ms (simulated time)
defaultOpDelays.put(CollectionParams.CollectionAction.MOVEREPLICA.name(), 5000L);
@@ -138,7 +145,18 @@ public class SimClusterStateProvider implements ClusterStateProvider {
sliceProperties.clear();
nodeReplicaMap.clear();
liveNodes.clear();
+ for (String nodeId : stateManager.listData(ZkStateReader.LIVE_NODES_ZKNODE)) {
+ if (stateManager.hasData(ZkStateReader.LIVE_NODES_ZKNODE + "/" + nodeId)) {
+ stateManager.removeData(ZkStateReader.LIVE_NODES_ZKNODE + "/" + nodeId, -1);
+ }
+ if (stateManager.hasData(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + nodeId)) {
+ stateManager.removeData(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + nodeId, -1);
+ }
+ }
liveNodes.addAll(initialState.getLiveNodes());
+ for (String nodeId : liveNodes.get()) {
+ createEphemeralLiveNode(nodeId);
+ }
initialState.forEachCollection(dc -> {
collProperties.computeIfAbsent(dc.getName(), name -> new ConcurrentHashMap<>()).putAll(dc.getProperties());
opDelays.computeIfAbsent(dc.getName(), c -> new HashMap<>()).putAll(defaultOpDelays);
@@ -147,7 +165,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
.computeIfAbsent(s.getName(), name -> new HashMap<>()).putAll(s.getProperties());
s.getReplicas().forEach(r -> {
ReplicaInfo ri = new ReplicaInfo(r.getName(), r.getCoreName(), dc.getName(), s.getName(), r.getType(), r.getNodeName(), r.getProperties());
- if (liveNodes.contains(r.getNodeName())) {
+ if (liveNodes.get().contains(r.getNodeName())) {
nodeReplicaMap.computeIfAbsent(r.getNodeName(), rn -> new ArrayList<>()).add(ri);
}
});
@@ -175,11 +193,10 @@ public class SimClusterStateProvider implements ClusterStateProvider {
if (liveNodes.isEmpty()) {
return null;
}
- List<String> nodes = new ArrayList<>(liveNodes);
+ List<String> nodes = new ArrayList<>(liveNodes.get());
return nodes.get(random.nextInt(nodes.size()));
}
- // todo: maybe hook up DistribStateManager /live_nodes ?
// todo: maybe hook up DistribStateManager /clusterstate.json watchers?
/**
@@ -191,6 +208,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
throw new Exception("Node " + nodeId + " already exists");
}
liveNodes.add(nodeId);
+ createEphemeralLiveNode(nodeId);
nodeReplicaMap.putIfAbsent(nodeId, new ArrayList<>());
}
@@ -218,7 +236,6 @@ public class SimClusterStateProvider implements ClusterStateProvider {
}
}
- // todo: maybe hook up DistribStateManager /live_nodes ?
// todo: maybe hook up DistribStateManager /clusterstate.json watchers?
/**
@@ -234,6 +251,13 @@ public class SimClusterStateProvider implements ClusterStateProvider {
// mark every replica on that node as down
setReplicaStates(nodeId, Replica.State.DOWN, collections);
boolean res = liveNodes.remove(nodeId);
+ // remove ephemeral nodes
+ stateManager.getRoot().removeEphemeralChildren(nodeId);
+ // create a nodeLost marker if needed
+ AutoScalingConfig cfg = stateManager.getAutoScalingConfig(null);
+ if (cfg.hasTriggerForEvents(TriggerEventType.NODELOST)) {
+ stateManager.makePath(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH + "/" + nodeId);
+ }
if (!collections.isEmpty()) {
cloudManager.submit(new LeaderElection(collections, true));
}
@@ -254,8 +278,19 @@ public class SimClusterStateProvider implements ClusterStateProvider {
}
}
+ // this method needs to be called under a lock
+ private void createEphemeralLiveNode(String nodeId) throws Exception {
+ DistribStateManager mgr = stateManager.withEphemeralId(nodeId);
+ mgr.makePath(ZkStateReader.LIVE_NODES_ZKNODE + "/" + nodeId, null, CreateMode.EPHEMERAL, true);
+ AutoScalingConfig cfg = stateManager.getAutoScalingConfig(null);
+ if (cfg.hasTriggerForEvents(TriggerEventType.NODEADDED)) {
+ mgr.makePath(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + nodeId, null, CreateMode.EPHEMERAL, true);
+ }
+ }
+
public boolean simRestoreNode(String nodeId) throws Exception {
liveNodes.add(nodeId);
+ createEphemeralLiveNode(nodeId);
Set<String> collections = new HashSet<>();
lock.lock();
try {
@@ -307,8 +342,6 @@ public class SimClusterStateProvider implements ClusterStateProvider {
simAddReplica(message.getStr(CoreAdminParams.NODE), ri, true);
}
- // todo: maybe hook up DistribStateManager /clusterstate.json watchers?
-
/**
* Add a replica. Note that all details of the replica must be present here, including
* node, coreNodeName and SolrCore name.
@@ -376,8 +409,6 @@ public class SimClusterStateProvider implements ClusterStateProvider {
}
}
- // todo: maybe hook up DistribStateManager /clusterstate.json watchers?
-
/**
* Remove replica.
* @param nodeId node id
@@ -485,7 +516,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
if (ri.getVariables().remove(ZkStateReader.LEADER_PROP) != null) {
stateChanged.set(true);
}
- if (r.isActive(liveNodes)) {
+ if (r.isActive(liveNodes.get())) {
active.add(ri);
} else { // if it's on a node that is not live mark it down
if (!liveNodes.contains(r.getNodeName())) {
@@ -1055,10 +1086,15 @@ public class SimClusterStateProvider implements ClusterStateProvider {
/**
* Return all replica infos for a node.
* @param node node id
- * @return list of replicas on that node
+ * @return list of replicas on that node, or empty list if none
*/
public List<ReplicaInfo> simGetReplicaInfos(String node) {
- return nodeReplicaMap.get(node);
+ List<ReplicaInfo> replicas = nodeReplicaMap.get(node);
+ if (replicas == null) {
+ return Collections.emptyList();
+ } else {
+ return replicas;
+ }
}
/**
@@ -1091,7 +1127,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
@Override
public Set<String> getLiveNodes() {
- return liveNodes;
+ return liveNodes.get();
}
@Override
@@ -1101,7 +1137,7 @@ public class SimClusterStateProvider implements ClusterStateProvider {
@Override
public ClusterState getClusterState() throws IOException {
- return new ClusterState(0, liveNodes, getCollectionStates());
+ return new ClusterState(0, liveNodes.get(), getCollectionStates());
}
private Map<String, DocCollection> getCollectionStates() {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9e1c2490/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimDistribStateManager.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimDistribStateManager.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimDistribStateManager.java
index a570e6d..12b38e7 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimDistribStateManager.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimDistribStateManager.java
@@ -239,6 +239,23 @@ public class SimDistribStateManager implements DistribStateManager {
this.errorRef.set(actionError);
}
+ private SimDistribStateManager(String id, ExecutorService watchersPool, Node root, ActionThrottle actionThrottle,
+ ActionError actionError) {
+ this.id = id;
+ this.watchersPool = watchersPool;
+ this.root = root;
+ this.throttleRef.set(actionThrottle);
+ this.errorRef.set(actionError);
+ }
+
+ public SimDistribStateManager withEphemeralId(String id) {
+ return new SimDistribStateManager(id, watchersPool, root, throttleRef.get(), errorRef.get());
+ }
+
+ public Node getRoot() {
+ return root;
+ }
+
public void clear() {
root.clear();
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9e1c2490/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimNodeStateProvider.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimNodeStateProvider.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimNodeStateProvider.java
index 65fb5b9..a9b3b5b 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimNodeStateProvider.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimNodeStateProvider.java
@@ -47,12 +47,12 @@ public class SimNodeStateProvider implements NodeStateProvider {
private final Map<String, Map<String, Object>> nodeValues = new ConcurrentHashMap<>();
private final SimClusterStateProvider clusterStateProvider;
private final SimDistribStateManager stateManager;
- private final Set<String> liveNodes;
+ private final LiveNodesSet liveNodesSet;
- public SimNodeStateProvider(Set<String> liveNodes, SimDistribStateManager stateManager,
+ public SimNodeStateProvider(LiveNodesSet liveNodesSet, SimDistribStateManager stateManager,
SimClusterStateProvider clusterStateProvider,
Map<String, Map<String, Object>> nodeValues) {
- this.liveNodes = liveNodes;
+ this.liveNodesSet = liveNodesSet;
this.stateManager = stateManager;
this.clusterStateProvider = clusterStateProvider;
if (nodeValues != null) {
@@ -226,7 +226,7 @@ public class SimNodeStateProvider implements NodeStateProvider {
@Override
public Map<String, Object> getNodeValues(String node, Collection<String> tags) {
LOG.trace("-- requested values for " + node + ": " + tags);
- if (!liveNodes.contains(node)) {
+ if (!liveNodesSet.contains(node)) {
nodeValues.remove(node);
return Collections.emptyMap();
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9e1c2490/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestClusterStateProvider.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestClusterStateProvider.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestClusterStateProvider.java
index 38dd7dc..cb3bb4c 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestClusterStateProvider.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestClusterStateProvider.java
@@ -169,15 +169,30 @@ public class TestClusterStateProvider extends SolrCloudTestCase {
@Test
public void testAddRemoveNode() throws Exception {
Set<String> lastNodes = new HashSet<>(cloudManager.getClusterStateProvider().getLiveNodes());
+ List<String> liveNodes = cloudManager.getDistribStateManager().listData(ZkStateReader.LIVE_NODES_ZKNODE);
+ assertEquals(lastNodes.size(), liveNodes.size());
+ liveNodes.removeAll(lastNodes);
+ assertTrue(liveNodes.isEmpty());
+
String node = addNode();
- Thread.sleep(2000);
+ cloudManager.getTimeSource().sleep(2000);
assertFalse(lastNodes.contains(node));
- assertTrue(cloudManager.getClusterStateProvider().getLiveNodes().contains(node));
+ lastNodes = new HashSet<>(cloudManager.getClusterStateProvider().getLiveNodes());
+ assertTrue(lastNodes.contains(node));
+ liveNodes = cloudManager.getDistribStateManager().listData(ZkStateReader.LIVE_NODES_ZKNODE);
+ assertEquals(lastNodes.size(), liveNodes.size());
+ liveNodes.removeAll(lastNodes);
+ assertTrue(liveNodes.isEmpty());
+
node = deleteNode();
- Thread.sleep(2000);
+ cloudManager.getTimeSource().sleep(2000);
assertTrue(lastNodes.contains(node));
- assertFalse(cloudManager.getClusterStateProvider().getLiveNodes().contains(node));
- }
+ lastNodes = new HashSet<>(cloudManager.getClusterStateProvider().getLiveNodes());
+ assertFalse(lastNodes.contains(node));
+ liveNodes = cloudManager.getDistribStateManager().listData(ZkStateReader.LIVE_NODES_ZKNODE);
+ assertEquals(lastNodes.size(), liveNodes.size());
+ liveNodes.removeAll(lastNodes);
+ assertTrue(liveNodes.isEmpty()); }
@Test
public void testAutoScalingConfig() throws Exception {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9e1c2490/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestNodeAddedTrigger.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestNodeAddedTrigger.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestNodeAddedTrigger.java
new file mode 100644
index 0000000..c1f10d0
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestNodeAddedTrigger.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.autoscaling.sim;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.solr.cloud.autoscaling.ActionContext;
+import org.apache.solr.cloud.autoscaling.AutoScaling;
+import org.apache.solr.cloud.autoscaling.NodeAddedTrigger;
+import org.apache.solr.cloud.autoscaling.TriggerAction;
+import org.apache.solr.cloud.autoscaling.TriggerEvent;
+import org.apache.solr.common.util.TimeSource;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Test for {@link NodeAddedTrigger}
+ */
+public class TestNodeAddedTrigger extends SimSolrCloudTestCase {
+ private static AtomicBoolean actionConstructorCalled = new AtomicBoolean(false);
+ private static AtomicBoolean actionInitCalled = new AtomicBoolean(false);
+ private static AtomicBoolean actionCloseCalled = new AtomicBoolean(false);
+
+ private AutoScaling.TriggerEventProcessor noFirstRunProcessor = event -> {
+ fail("Did not expect the listener to fire on first run!");
+ return true;
+ };
+
+ private static int SPEED = 50;
+
+ // currentTimeMillis is not as precise so to avoid false positives while comparing time of fire, we add some delta
+ private static final long WAIT_FOR_DELTA_NANOS = TimeUnit.MILLISECONDS.toNanos(2);
+
+ private static TimeSource timeSource;
+
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ cluster = SimCloudManager.createCluster(1, TimeSource.get("simTime:" + SPEED));
+ timeSource = cluster.getTimeSource();
+ }
+
+ @Before
+ public void beforeTest() throws Exception {
+ actionConstructorCalled = new AtomicBoolean(false);
+ actionInitCalled = new AtomicBoolean(false);
+ actionCloseCalled = new AtomicBoolean(false);
+ }
+
+ @Test
+ public void testTrigger() throws Exception {
+ long waitForSeconds = 1 + random().nextInt(5);
+ Map<String, Object> props = createTriggerProps(waitForSeconds);
+
+ try (NodeAddedTrigger trigger = new NodeAddedTrigger("node_added_trigger", props, cluster.getLoader(), cluster)) {
+ trigger.setProcessor(noFirstRunProcessor);
+ trigger.run();
+
+ String newNode1 = cluster.simAddNode();
+ String newNode2 = cluster.simAddNode();
+ AtomicBoolean fired = new AtomicBoolean(false);
+ AtomicReference<TriggerEvent> eventRef = new AtomicReference<>();
+ trigger.setProcessor(event -> {
+ if (fired.compareAndSet(false, true)) {
+ eventRef.set(event);
+ long currentTimeNanos = timeSource.getTime();
+ long eventTimeNanos = event.getEventTime();
+ long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
+ if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
+ fail("NodeAddedListener was fired before the configured waitFor period: currentTimeNanos=" + currentTimeNanos + ", eventTimeNanos=" + eventTimeNanos + ",waitForNanos=" + waitForNanos);
+ }
+ } else {
+ fail("NodeAddedTrigger was fired more than once!");
+ }
+ return true;
+ });
+ int counter = 0;
+ do {
+ trigger.run();
+ timeSource.sleep(1000);
+ if (counter++ > 10) {
+ fail("Newly added node was not discovered by trigger even after 10 seconds");
+ }
+ } while (!fired.get());
+
+ TriggerEvent nodeAddedEvent = eventRef.get();
+ assertNotNull(nodeAddedEvent);
+ List<String> nodeNames = (List<String>)nodeAddedEvent.getProperty(TriggerEvent.NODE_NAMES);
+ assertTrue(nodeNames.contains(newNode1));
+ assertTrue(nodeNames.contains(newNode2));
+ }
+
+ // add a new node but remove it before the waitFor period expires
+ // and assert that the trigger doesn't fire at all
+ try (NodeAddedTrigger trigger = new NodeAddedTrigger("node_added_trigger", props, cluster.getLoader(), cluster)) {
+ final long waitTime = 2;
+ props.put("waitFor", waitTime);
+ trigger.setProcessor(noFirstRunProcessor);
+ trigger.run();
+
+ String newNode = cluster.simAddNode();
+ AtomicBoolean fired = new AtomicBoolean(false);
+ trigger.setProcessor(event -> {
+ if (fired.compareAndSet(false, true)) {
+ long currentTimeNanos = timeSource.getTime();
+ long eventTimeNanos = event.getEventTime();
+ long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
+ if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
+ fail("NodeAddedListener was fired before the configured waitFor period: currentTimeNanos=" + currentTimeNanos + ", eventTimeNanos=" + eventTimeNanos + ",waitForNanos=" + waitForNanos);
+ }
+ } else {
+ fail("NodeAddedTrigger was fired more than once!");
+ }
+ return true;
+ });
+ trigger.run(); // first run should detect the new node
+ cluster.simRemoveNode(newNode, true);
+ int counter = 0;
+ do {
+ trigger.run();
+ timeSource.sleep(1000);
+ if (counter++ > waitTime + 1) { // run it a little more than the wait time
+ break;
+ }
+ } while (true);
+
+ // ensure the event was not fired
+ assertFalse(fired.get());
+ }
+ }
+
+ public void testActionLifecycle() throws Exception {
+ Map<String, Object> props = createTriggerProps(0);
+ List<Map<String, String>> actions = (List<Map<String, String>>) props.get("actions");
+ Map<String, String> action = new HashMap<>(2);
+ action.put("name", "testActionInit");
+ action.put("class", TestNodeAddedTrigger.AssertInitTriggerAction.class.getName());
+ actions.add(action);
+ try (NodeAddedTrigger trigger = new NodeAddedTrigger("node_added_trigger", props, cluster.getLoader(), cluster)) {
+ assertEquals(true, actionConstructorCalled.get());
+ assertEquals(false, actionInitCalled.get());
+ assertEquals(false, actionCloseCalled.get());
+ trigger.init();
+ assertEquals(true, actionInitCalled.get());
+ assertEquals(false, actionCloseCalled.get());
+ }
+ assertEquals(true, actionCloseCalled.get());
+ }
+
+ public static class AssertInitTriggerAction implements TriggerAction {
+ public AssertInitTriggerAction() {
+ actionConstructorCalled.set(true);
+ }
+
+ @Override
+ public String getName() {
+ return "";
+ }
+
+ @Override
+ public void process(TriggerEvent event, ActionContext actionContext) {
+
+ }
+
+ @Override
+ public void close() throws IOException {
+ actionCloseCalled.compareAndSet(false, true);
+ }
+
+ @Override
+ public void init(Map<String, String> args) {
+ actionInitCalled.compareAndSet(false, true);
+ }
+ }
+
+ @Test
+ public void testListenerAcceptance() throws Exception {
+ Map<String, Object> props = createTriggerProps(0);
+ try (NodeAddedTrigger trigger = new NodeAddedTrigger("node_added_trigger", props, cluster.getLoader(), cluster)) {
+ trigger.setProcessor(noFirstRunProcessor);
+ trigger.run(); // starts tracking live nodes
+
+ String newNode = cluster.simAddNode();
+ AtomicInteger callCount = new AtomicInteger(0);
+ AtomicBoolean fired = new AtomicBoolean(false);
+
+ trigger.setProcessor(event -> {
+ if (callCount.incrementAndGet() < 2) {
+ return false;
+ } else {
+ fired.compareAndSet(false, true);
+ return true;
+ }
+ });
+
+ trigger.run(); // first run should detect the new node and fire immediately but listener isn't ready
+ assertEquals(1, callCount.get());
+ assertFalse(fired.get());
+ trigger.run(); // second run should again fire
+ assertEquals(2, callCount.get());
+ assertTrue(fired.get());
+ trigger.run(); // should not fire
+ assertEquals(2, callCount.get());
+ }
+ }
+
+ @Test
+ public void testRestoreState() throws Exception {
+ long waitForSeconds = 1 + random().nextInt(5);
+ Map<String, Object> props = createTriggerProps(waitForSeconds);
+
+ // add a new node but update the trigger before the waitFor period expires
+ // and assert that the new trigger still fires
+ NodeAddedTrigger trigger = new NodeAddedTrigger("node_added_trigger", props, cluster.getLoader(), cluster);
+ trigger.setProcessor(noFirstRunProcessor);
+ trigger.run();
+
+ String newNode = cluster.simAddNode();
+ trigger.run(); // this run should detect the new node
+ trigger.close(); // close the old trigger
+
+ try (NodeAddedTrigger newTrigger = new NodeAddedTrigger("some_different_name", props, cluster.getLoader(), cluster)) {
+ try {
+ newTrigger.restoreState(trigger);
+ fail("Trigger should only be able to restore state from an old trigger of the same name");
+ } catch (AssertionError e) {
+ // expected
+ }
+ }
+
+ try (NodeAddedTrigger newTrigger = new NodeAddedTrigger("node_added_trigger", props, cluster.getLoader(), cluster)) {
+ AtomicBoolean fired = new AtomicBoolean(false);
+ AtomicReference<TriggerEvent> eventRef = new AtomicReference<>();
+ newTrigger.setProcessor(event -> {
+ if (fired.compareAndSet(false, true)) {
+ eventRef.set(event);
+ long currentTimeNanos = timeSource.getTime();
+ long eventTimeNanos = event.getEventTime();
+ long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
+ if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
+ fail("NodeAddedListener was fired before the configured waitFor period: currentTimeNanos=" + currentTimeNanos + ", eventTimeNanos=" + eventTimeNanos + ",waitForNanos=" + waitForNanos);
+ }
+ } else {
+ fail("NodeAddedTrigger was fired more than once!");
+ }
+ return true;
+ });
+ newTrigger.restoreState(trigger); // restore state from the old trigger
+ int counter = 0;
+ do {
+ newTrigger.run();
+ timeSource.sleep(1000);
+ if (counter++ > 10) {
+ fail("Newly added node was not discovered by trigger even after 10 seconds");
+ }
+ } while (!fired.get());
+
+ // ensure the event was fired
+ assertTrue(fired.get());
+ TriggerEvent nodeAddedEvent = eventRef.get();
+ assertNotNull(nodeAddedEvent);
+ //TODO assertEquals("", newNode.getNodeName(), nodeAddedEvent.getProperty(NodeAddedTrigger.NodeAddedEvent.NODE_NAME));
+ }
+ }
+
+ private Map<String, Object> createTriggerProps(long waitForSeconds) {
+ Map<String, Object> props = new HashMap<>();
+ props.put("event", "nodeLost");
+ props.put("waitFor", waitForSeconds);
+ props.put("enabled", true);
+ List<Map<String, String>> actions = new ArrayList<>(3);
+ Map<String, String> map = new HashMap<>(2);
+ map.put("name", "compute_plan");
+ map.put("class", "solr.ComputePlanAction");
+ actions.add(map);
+ map = new HashMap<>(2);
+ map.put("name", "execute_plan");
+ map.put("class", "solr.ExecutePlanAction");
+ actions.add(map);
+ props.put("actions", actions);
+ return props;
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9e1c2490/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestNodeLostTrigger.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestNodeLostTrigger.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestNodeLostTrigger.java
new file mode 100644
index 0000000..18ee355
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestNodeLostTrigger.java
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.autoscaling.sim;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.cloud.autoscaling.ActionContext;
+import org.apache.solr.cloud.autoscaling.AutoScaling;
+import org.apache.solr.cloud.autoscaling.NodeLostTrigger;
+import org.apache.solr.cloud.autoscaling.TriggerAction;
+import org.apache.solr.cloud.autoscaling.TriggerEvent;
+import org.apache.solr.common.util.TimeSource;
+import org.apache.solr.core.CoreContainer;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Test for {@link NodeLostTrigger}
+ */
+public class TestNodeLostTrigger extends SimSolrCloudTestCase {
+ private static AtomicBoolean actionConstructorCalled = new AtomicBoolean(false);
+ private static AtomicBoolean actionInitCalled = new AtomicBoolean(false);
+ private static AtomicBoolean actionCloseCalled = new AtomicBoolean(false);
+
+ private AutoScaling.TriggerEventProcessor noFirstRunProcessor = event -> {
+ fail("Did not expect the listener to fire on first run!");
+ return true;
+ };
+
+ private static final int SPEED = 50;
+ // use the same time source as the trigger
+ private static TimeSource timeSource;
+ // currentTimeMillis is not as precise so to avoid false positives while comparing time of fire, we add some delta
+ private static final long WAIT_FOR_DELTA_NANOS = TimeUnit.MILLISECONDS.toNanos(5);
+
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ cluster = SimCloudManager.createCluster(5, TimeSource.get("simTime:" + SPEED));
+ timeSource = cluster.getTimeSource();
+ }
+
+ @Before
+ public void beforeTest() throws Exception {
+ actionConstructorCalled = new AtomicBoolean(false);
+ actionInitCalled = new AtomicBoolean(false);
+ actionCloseCalled = new AtomicBoolean(false);
+ }
+
+ @Test
+ public void testTrigger() throws Exception {
+ long waitForSeconds = 1 + random().nextInt(5);
+ Map<String, Object> props = createTriggerProps(waitForSeconds);
+
+ try (NodeLostTrigger trigger = new NodeLostTrigger("node_lost_trigger", props, cluster.getLoader(), cluster)) {
+ trigger.setProcessor(noFirstRunProcessor);
+ trigger.run();
+ Iterator<String> it = cluster.getLiveNodesSet().get().iterator();
+ String lostNodeName1 = it.next();
+ String lostNodeName2 = it.next();
+ cluster.simRemoveNode(lostNodeName1, true);
+ cluster.simRemoveNode(lostNodeName2, true);
+ timeSource.sleep(1000);
+
+ AtomicBoolean fired = new AtomicBoolean(false);
+ AtomicReference<TriggerEvent> eventRef = new AtomicReference<>();
+ trigger.setProcessor(event -> {
+ if (fired.compareAndSet(false, true)) {
+ eventRef.set(event);
+ long currentTimeNanos = timeSource.getTime();
+ long eventTimeNanos = event.getEventTime();
+ long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
+ if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
+ fail("NodeLostListener was fired before the configured waitFor period: currentTimeNanos=" + currentTimeNanos + ", eventTimeNanos=" + eventTimeNanos + ",waitForNanos=" + waitForNanos);
+ }
+ } else {
+ fail("NodeLostListener was fired more than once!");
+ }
+ return true;
+ });
+ int counter = 0;
+ do {
+ trigger.run();
+ timeSource.sleep(1000);
+ if (counter++ > 10) {
+ fail("Lost node was not discovered by trigger even after 10 seconds");
+ }
+ } while (!fired.get());
+
+ TriggerEvent nodeLostEvent = eventRef.get();
+ assertNotNull(nodeLostEvent);
+ List<String> nodeNames = (List<String>)nodeLostEvent.getProperty(TriggerEvent.NODE_NAMES);
+ assertTrue(nodeNames + " doesn't contain " + lostNodeName1, nodeNames.contains(lostNodeName1));
+ assertTrue(nodeNames + " doesn't contain " + lostNodeName2, nodeNames.contains(lostNodeName2));
+
+ }
+
+ // remove a node but add it back before the waitFor period expires
+ // and assert that the trigger doesn't fire at all
+ try (NodeLostTrigger trigger = new NodeLostTrigger("node_lost_trigger", props, cluster.getLoader(), cluster)) {
+ final long waitTime = 2;
+ props.put("waitFor", waitTime);
+ trigger.setProcessor(noFirstRunProcessor);
+ trigger.run();
+
+ String lostNode = cluster.getSimClusterStateProvider().simGetRandomNode(random());
+ cluster.simRemoveNode(lostNode, false);
+ AtomicBoolean fired = new AtomicBoolean(false);
+ trigger.setProcessor(event -> {
+ if (fired.compareAndSet(false, true)) {
+ long currentTimeNanos = timeSource.getTime();
+ long eventTimeNanos = event.getEventTime();
+ long waitForNanos = TimeUnit.NANOSECONDS.convert(waitTime, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
+ if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
+ fail("NodeLostListener was fired before the configured waitFor period: currentTimeNanos=" + currentTimeNanos + ", eventTimeNanos=" + eventTimeNanos + ",waitForNanos=" + waitForNanos);
+ }
+ } else {
+ fail("NodeLostListener was fired more than once!");
+ }
+ return true;
+ });
+ trigger.run(); // first run should detect the lost node
+ int counter = 0;
+ do {
+ if (cluster.getLiveNodesSet().get().size() == 2) {
+ break;
+ }
+ timeSource.sleep(100);
+ if (counter++ > 20) {
+ fail("Live nodes not updated!");
+ }
+ } while (true);
+ counter = 0;
+ cluster.getSimClusterStateProvider().simRestoreNode(lostNode);
+ do {
+ trigger.run();
+ timeSource.sleep(1000);
+ if (counter++ > waitTime + 1) { // run it a little more than the wait time
+ break;
+ }
+ } while (true);
+
+ // ensure the event was not fired
+ assertFalse(fired.get());
+ }
+ }
+
+ public void testActionLifecycle() throws Exception {
+ Map<String, Object> props = createTriggerProps(0);
+ List<Map<String, String>> actions = (List<Map<String, String>>) props.get("actions");
+ Map<String, String> action = new HashMap<>(2);
+ action.put("name", "testActionInit");
+ action.put("class", AssertInitTriggerAction.class.getName());
+ actions.add(action);
+ try (NodeLostTrigger trigger = new NodeLostTrigger("node_added_trigger", props, cluster.getLoader(), cluster)) {
+ assertEquals(true, actionConstructorCalled.get());
+ assertEquals(false, actionInitCalled.get());
+ assertEquals(false, actionCloseCalled.get());
+ trigger.init();
+ assertEquals(true, actionInitCalled.get());
+ assertEquals(false, actionCloseCalled.get());
+ }
+ assertEquals(true, actionCloseCalled.get());
+ }
+
+ public static class AssertInitTriggerAction implements TriggerAction {
+ public AssertInitTriggerAction() {
+ actionConstructorCalled.set(true);
+ }
+
+ @Override
+ public String getName() {
+ return "";
+ }
+
+ @Override
+ public void process(TriggerEvent event, ActionContext actionContext) {
+
+ }
+
+ @Override
+ public void close() throws IOException {
+ actionCloseCalled.compareAndSet(false, true);
+ }
+
+ @Override
+ public void init(Map<String, String> args) {
+ actionInitCalled.compareAndSet(false, true);
+ }
+ }
+
+ @Test
+ public void testListenerAcceptance() throws Exception {
+ Map<String, Object> props = createTriggerProps(0);
+ try (NodeLostTrigger trigger = new NodeLostTrigger("node_added_trigger", props, cluster.getLoader(), cluster)) {
+ trigger.setProcessor(noFirstRunProcessor);
+
+ String newNode = cluster.simAddNode();
+
+ trigger.run(); // starts tracking live nodes
+
+ // stop the newly created node
+ cluster.simRemoveNode(newNode, true);
+
+ AtomicInteger callCount = new AtomicInteger(0);
+ AtomicBoolean fired = new AtomicBoolean(false);
+
+ trigger.setProcessor(event -> {
+ if (callCount.incrementAndGet() < 2) {
+ return false;
+ } else {
+ fired.compareAndSet(false, true);
+ return true;
+ }
+ });
+
+ trigger.run(); // first run should detect the lost node and fire immediately but listener isn't ready
+ assertEquals(1, callCount.get());
+ assertFalse(fired.get());
+ trigger.run(); // second run should again fire
+ assertEquals(2, callCount.get());
+ assertTrue(fired.get());
+ trigger.run(); // should not fire
+ assertEquals(2, callCount.get());
+ }
+ }
+
+ @Test
+ public void testRestoreState() throws Exception {
+ long waitForSeconds = 1 + random().nextInt(5);
+ Map<String, Object> props = createTriggerProps(waitForSeconds);
+
+ String newNode = cluster.simAddNode();
+
+ // remove a node but update the trigger before the waitFor period expires
+ // and assert that the new trigger still fires
+
+ NodeLostTrigger trigger = new NodeLostTrigger("node_lost_trigger", props, cluster.getLoader(), cluster);
+ trigger.setProcessor(noFirstRunProcessor);
+ trigger.run();
+
+ // stop the newly created node
+ cluster.simRemoveNode(newNode, true);
+
+ trigger.run(); // this run should detect the lost node
+ trigger.close(); // close the old trigger
+
+ try (NodeLostTrigger newTrigger = new NodeLostTrigger("some_different_name", props, cluster.getLoader(), cluster)) {
+ try {
+ newTrigger.restoreState(trigger);
+ fail("Trigger should only be able to restore state from an old trigger of the same name");
+ } catch (AssertionError e) {
+ // expected
+ }
+ }
+
+ try (NodeLostTrigger newTrigger = new NodeLostTrigger("node_lost_trigger", props, cluster.getLoader(), cluster)) {
+ AtomicBoolean fired = new AtomicBoolean(false);
+ AtomicReference<TriggerEvent> eventRef = new AtomicReference<>();
+ newTrigger.setProcessor(event -> {
+ if (fired.compareAndSet(false, true)) {
+ eventRef.set(event);
+ long currentTimeNanos = timeSource.getTime();
+ long eventTimeNanos = event.getEventTime();
+ long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
+ if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
+ fail("NodeLostListener was fired before the configured waitFor period: currentTimeNanos=" + currentTimeNanos + ", eventTimeNanos=" + eventTimeNanos + ",waitForNanos=" + waitForNanos);
+ }
+ } else {
+ fail("NodeLostListener was fired more than once!");
+ }
+ return true;
+ });
+ newTrigger.restoreState(trigger); // restore state from the old trigger
+ int counter = 0;
+ do {
+ newTrigger.run();
+ timeSource.sleep(1000);
+ if (counter++ > 10) {
+ fail("Lost node was not discovered by trigger even after 10 seconds");
+ }
+ } while (!fired.get());
+
+ TriggerEvent nodeLostEvent = eventRef.get();
+ assertNotNull(nodeLostEvent);
+ List<String> nodeNames = (List<String>)nodeLostEvent.getProperty(TriggerEvent.NODE_NAMES);
+ assertTrue(nodeNames.contains(newNode));
+ }
+ }
+
+ private Map<String, Object> createTriggerProps(long waitForSeconds) {
+ Map<String, Object> props = new HashMap<>();
+ props.put("event", "nodeLost");
+ props.put("waitFor", waitForSeconds);
+ props.put("enabled", true);
+ List<Map<String, String>> actions = new ArrayList<>(3);
+ Map<String, String> map = new HashMap<>(2);
+ map.put("name", "compute_plan");
+ map.put("class", "solr.ComputePlanAction");
+ actions.add(map);
+ map = new HashMap<>(2);
+ map.put("name", "execute_plan");
+ map.put("class", "solr.ExecutePlanAction");
+ actions.add(map);
+ props.put("actions", actions);
+ return props;
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9e1c2490/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestTriggerIntegration.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestTriggerIntegration.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestTriggerIntegration.java
index 179af76..e05b789 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestTriggerIntegration.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestTriggerIntegration.java
@@ -20,9 +20,11 @@ package org.apache.solr.cloud.autoscaling.sim;
import java.lang.invoke.MethodHandles;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.SortedSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -37,14 +39,17 @@ import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
import org.apache.solr.client.solrj.cloud.autoscaling.SolrCloudManager;
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventProcessorStage;
+import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.cloud.autoscaling.ActionContext;
import org.apache.solr.cloud.autoscaling.NodeLostTrigger;
import org.apache.solr.cloud.autoscaling.ScheduledTriggers;
import org.apache.solr.cloud.autoscaling.TriggerActionBase;
import org.apache.solr.cloud.autoscaling.TriggerEvent;
+import org.apache.solr.cloud.autoscaling.TriggerEventQueue;
import org.apache.solr.cloud.autoscaling.TriggerListenerBase;
import org.apache.solr.cloud.autoscaling.CapturedEvent;
+import org.apache.solr.common.cloud.LiveNodesListener;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.TimeSource;
@@ -602,12 +607,10 @@ public class TestTriggerIntegration extends SimSolrCloudTestCase {
public static long eventQueueActionWait = 5000;
- // simulation framework doesn't support overseer
- /*
@Test
public void testEventQueue() throws Exception {
waitForSeconds = 1;
- CloudSolrClient solrClient = cluster.getSolrClient();
+ SolrClient solrClient = cluster.simGetSolrClient();
String setTriggerCommand = "{" +
"'set-trigger' : {" +
"'name' : 'node_added_trigger1'," +
@@ -616,56 +619,49 @@ public class TestTriggerIntegration extends SimSolrCloudTestCase {
"'enabled' : true," +
"'actions' : [{'name':'test','class':'" + TestEventQueueAction.class.getName() + "'}]" +
"}}";
- NamedList<Object> overSeerStatus = cluster.getSolrClient().request(CollectionAdminRequest.getOverseerStatus());
- String overseerLeader = (String) overSeerStatus.get("leader");
- int overseerLeaderIndex = 0;
- for (int i = 0; i < cluster.getJettySolrRunners().size(); i++) {
- JettySolrRunner jetty = cluster.getJettySolrRunner(i);
- if (jetty.getNodeName().equals(overseerLeader)) {
- overseerLeaderIndex = i;
- break;
- }
- }
+
+ String overseerLeader = cluster.getSimClusterStateProvider().simGetRandomNode(random());
+
SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
NamedList<Object> response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
- if (!actionInitCalled.await(3, TimeUnit.SECONDS)) {
+ if (!actionInitCalled.await(3000 / SPEED, TimeUnit.MILLISECONDS)) {
fail("The TriggerAction should have been created by now");
}
// add node to generate the event
- JettySolrRunner newNode = cluster.startJettySolrRunner();
- boolean await = actionStarted.await(60, TimeUnit.SECONDS);
+ String newNode = cluster.simAddNode();
+ boolean await = actionStarted.await(60000 / SPEED, TimeUnit.MILLISECONDS);
assertTrue("action did not start", await);
// event should be there
- NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = (NodeAddedTrigger.NodeAddedEvent) events.iterator().next();
+ TriggerEvent nodeAddedEvent = events.iterator().next();
assertNotNull(nodeAddedEvent);
// but action did not complete yet so the event is still enqueued
assertFalse(triggerFired.get());
events.clear();
actionStarted = new CountDownLatch(1);
eventQueueActionWait = 1;
- // kill overseer leader
- cluster.stopJettySolrRunner(overseerLeaderIndex);
- Thread.sleep(5000);
+ // kill overseer
+ cluster.simRestartOverseer(overseerLeader);
+ cluster.getTimeSource().sleep(5000);
// new overseer leader should be elected and run triggers
- await = actionInterrupted.await(3, TimeUnit.SECONDS);
+ await = actionInterrupted.await(3000 / SPEED, TimeUnit.MILLISECONDS);
assertTrue("action wasn't interrupted", await);
// it should fire again from enqueued event
- await = actionStarted.await(60, TimeUnit.SECONDS);
+ await = actionStarted.await(60000 / SPEED, TimeUnit.MILLISECONDS);
assertTrue("action wasn't started", await);
TriggerEvent replayedEvent = events.iterator().next();
assertTrue(replayedEvent.getProperty(TriggerEventQueue.ENQUEUE_TIME) != null);
assertTrue(events + "\n" + replayedEvent.toString(), replayedEvent.getProperty(TriggerEventQueue.DEQUEUE_TIME) != null);
- await = actionCompleted.await(10, TimeUnit.SECONDS);
+ await = actionCompleted.await(10000 / SPEED, TimeUnit.MILLISECONDS);
assertTrue("action wasn't completed", await);
assertTrue(triggerFired.get());
}
@Test
public void testEventFromRestoredState() throws Exception {
- CloudSolrClient solrClient = cluster.getSolrClient();
+ SolrClient solrClient = cluster.simGetSolrClient();
String setTriggerCommand = "{" +
"'set-trigger' : {" +
"'name' : 'node_added_trigger'," +
@@ -678,45 +674,33 @@ public class TestTriggerIntegration extends SimSolrCloudTestCase {
NamedList<Object> response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
- if (!actionInitCalled.await(10, TimeUnit.SECONDS)) {
+ if (!actionInitCalled.await(10000 / SPEED, TimeUnit.MILLISECONDS)) {
fail("The TriggerAction should have been created by now");
}
- NamedList<Object> overSeerStatus = cluster.getSolrClient().request(CollectionAdminRequest.getOverseerStatus());
- String overseerLeader = (String) overSeerStatus.get("leader");
- int overseerLeaderIndex = 0;
- for (int i = 0; i < cluster.getJettySolrRunners().size(); i++) {
- JettySolrRunner jetty = cluster.getJettySolrRunner(i);
- if (jetty.getNodeName().equals(overseerLeader)) {
- overseerLeaderIndex = i;
- break;
- }
- }
-
events.clear();
- JettySolrRunner newNode = cluster.startJettySolrRunner();
- boolean await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
+ String newNode = cluster.simAddNode();
+ boolean await = triggerFiredLatch.await(20000 / SPEED, TimeUnit.MILLISECONDS);
assertTrue("The trigger did not fire at all", await);
assertTrue(triggerFired.get());
// reset
triggerFired.set(false);
triggerFiredLatch = new CountDownLatch(1);
- NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = (NodeAddedTrigger.NodeAddedEvent) events.iterator().next();
+ TriggerEvent nodeAddedEvent = events.iterator().next();
assertNotNull(nodeAddedEvent);
List<String> nodeNames = (List<String>)nodeAddedEvent.getProperty(TriggerEvent.NODE_NAMES);
- assertTrue(nodeNames.contains(newNode.getNodeName()));
+ assertTrue(nodeNames.contains(newNode));
// add a second node - state of the trigger will change but it won't fire for waitFor sec.
- JettySolrRunner newNode2 = cluster.startJettySolrRunner();
- Thread.sleep(10000);
- // kill overseer leader
- cluster.stopJettySolrRunner(overseerLeaderIndex);
- await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
+ String newNode2 = cluster.simAddNode();
+ cluster.getTimeSource().sleep(10000);
+ // kill overseer
+ cluster.simRestartOverseer(null);
+ await = triggerFiredLatch.await(20000 / SPEED, TimeUnit.MILLISECONDS);
assertTrue("The trigger did not fire at all", await);
assertTrue(triggerFired.get());
}
-
private static class TestLiveNodesListener implements LiveNodesListener {
Set<String> lostNodes = new HashSet<>();
Set<String> addedNodes = new HashSet<>();
@@ -745,7 +729,7 @@ public class TestTriggerIntegration extends SimSolrCloudTestCase {
private TestLiveNodesListener registerLiveNodesListener() {
TestLiveNodesListener listener = new TestLiveNodesListener();
- zkStateReader.registerLiveNodesListener(listener);
+ cluster.getLiveNodesSet().registerLiveNodesListener(listener);
return listener;
}
@@ -789,47 +773,42 @@ public class TestTriggerIntegration extends SimSolrCloudTestCase {
triggerFiredLatch = new CountDownLatch(2);
TestLiveNodesListener listener = registerLiveNodesListener();
- NamedList<Object> overSeerStatus = cluster.getSolrClient().request(CollectionAdminRequest.getOverseerStatus());
- String overseerLeader = (String) overSeerStatus.get("leader");
- int overseerLeaderIndex = 0;
- for (int i = 0; i < cluster.getJettySolrRunners().size(); i++) {
- JettySolrRunner jetty = cluster.getJettySolrRunner(i);
- if (jetty.getNodeName().equals(overseerLeader)) {
- overseerLeaderIndex = i;
- break;
- }
- }
+ SolrClient solrClient = cluster.simGetSolrClient();
+
+ // pick overseer node
+ String overseerLeader = cluster.getSimClusterStateProvider().simGetRandomNode(random());
+
// add a node
- JettySolrRunner node = cluster.startJettySolrRunner();
- if (!listener.onChangeLatch.await(10, TimeUnit.SECONDS)) {
+ String node = cluster.simAddNode();
+ if (!listener.onChangeLatch.await(10000 / SPEED, TimeUnit.MILLISECONDS)) {
fail("onChange listener didn't execute on cluster change");
}
assertEquals(1, listener.addedNodes.size());
- assertEquals(node.getNodeName(), listener.addedNodes.iterator().next());
+ assertEquals(node, listener.addedNodes.iterator().next());
// verify that a znode doesn't exist (no trigger)
- String pathAdded = ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + node.getNodeName();
- assertFalse("Path " + pathAdded + " was created but there are no nodeAdded triggers", zkClient().exists(pathAdded, true));
+ String pathAdded = ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + node;
+ assertFalse("Path " + pathAdded + " was created but there are no nodeAdded triggers",
+ cluster.getDistribStateManager().hasData(pathAdded));
listener.reset();
// stop overseer
log.info("====== KILL OVERSEER 1");
- cluster.stopJettySolrRunner(overseerLeaderIndex);
- if (!listener.onChangeLatch.await(10, TimeUnit.SECONDS)) {
+ cluster.simRestartOverseer(overseerLeader);
+ if (!listener.onChangeLatch.await(10000 / SPEED, TimeUnit.MILLISECONDS)) {
fail("onChange listener didn't execute on cluster change");
}
assertEquals(1, listener.lostNodes.size());
assertEquals(overseerLeader, listener.lostNodes.iterator().next());
assertEquals(0, listener.addedNodes.size());
// wait until the new overseer is up
- Thread.sleep(5000);
+ cluster.getTimeSource().sleep(5000);
// verify that a znode does NOT exist - there's no nodeLost trigger,
// so the new overseer cleaned up existing nodeLost markers
String pathLost = ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH + "/" + overseerLeader;
- assertFalse("Path " + pathLost + " exists", zkClient().exists(pathLost, true));
+ assertFalse("Path " + pathLost + " exists", cluster.getDistribStateManager().hasData(pathLost));
listener.reset();
// set up triggers
- CloudSolrClient solrClient = cluster.getSolrClient();
log.info("====== ADD TRIGGERS");
String setTriggerCommand = "{" +
@@ -856,45 +835,37 @@ public class TestTriggerIntegration extends SimSolrCloudTestCase {
response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
- overSeerStatus = cluster.getSolrClient().request(CollectionAdminRequest.getOverseerStatus());
- overseerLeader = (String) overSeerStatus.get("leader");
- overseerLeaderIndex = 0;
- for (int i = 0; i < cluster.getJettySolrRunners().size(); i++) {
- JettySolrRunner jetty = cluster.getJettySolrRunner(i);
- if (jetty.getNodeName().equals(overseerLeader)) {
- overseerLeaderIndex = i;
- break;
- }
- }
+ overseerLeader = cluster.getSimClusterStateProvider().simGetRandomNode(random());
// create another node
log.info("====== ADD NODE 1");
- JettySolrRunner node1 = cluster.startJettySolrRunner();
- if (!listener.onChangeLatch.await(10, TimeUnit.SECONDS)) {
+ String node1 = cluster.simAddNode();
+ if (!listener.onChangeLatch.await(10000 / SPEED, TimeUnit.MILLISECONDS)) {
fail("onChange listener didn't execute on cluster change");
}
assertEquals(1, listener.addedNodes.size());
- assertEquals(node1.getNodeName(), listener.addedNodes.iterator().next());
+ assertEquals(node1, listener.addedNodes.iterator().next());
// verify that a znode exists
- pathAdded = ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + node1.getNodeName();
- assertTrue("Path " + pathAdded + " wasn't created", zkClient().exists(pathAdded, true));
+ pathAdded = ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + node1;
+ assertTrue("Path " + pathAdded + " wasn't created", cluster.getDistribStateManager().hasData(pathAdded));
- Thread.sleep(5000);
+ cluster.getTimeSource().sleep(5000);
// nodeAdded marker should be consumed now by nodeAdded trigger
- assertFalse("Path " + pathAdded + " should have been deleted", zkClient().exists(pathAdded, true));
+ assertFalse("Path " + pathAdded + " should have been deleted",
+ cluster.getDistribStateManager().hasData(pathAdded));
listener.reset();
events.clear();
triggerFiredLatch = new CountDownLatch(1);
// kill overseer again
log.info("====== KILL OVERSEER 2");
- cluster.stopJettySolrRunner(overseerLeaderIndex);
- if (!listener.onChangeLatch.await(10, TimeUnit.SECONDS)) {
+ cluster.simRestartOverseer(overseerLeader);
+ if (!listener.onChangeLatch.await(10000 / SPEED, TimeUnit.MILLISECONDS)) {
fail("onChange listener didn't execute on cluster change");
}
- if (!triggerFiredLatch.await(20, TimeUnit.SECONDS)) {
+ if (!triggerFiredLatch.await(20000 / SPEED, TimeUnit.MILLISECONDS)) {
fail("Trigger should have fired by now");
}
assertEquals(1, events.size());
@@ -904,7 +875,6 @@ public class TestTriggerIntegration extends SimSolrCloudTestCase {
assertEquals(TriggerEventType.NODELOST, ev.getEventType());
}
-*/
static Map<String, List<CapturedEvent>> listenerEvents = new ConcurrentHashMap<>();
static CountDownLatch listenerCreated = new CountDownLatch(1);
static boolean failDummyAction = false;