You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2017/12/14 11:56:51 UTC
[4/9] lucene-solr:master: SOLR-11285: Simulation framework for
autoscaling.
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d6d2e3b2/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimNodeStateProvider.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimNodeStateProvider.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimNodeStateProvider.java
new file mode 100644
index 0000000..a96a1d5
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimNodeStateProvider.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud.autoscaling.sim;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import org.apache.solr.client.solrj.cloud.autoscaling.NodeStateProvider;
+import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Simulated {@link NodeStateProvider}.
+ * Note: in order to setup node-level metrics use {@link #simSetNodeValues(String, Map)}. However, in order
+ * to setup core-level metrics use {@link SimClusterStateProvider#simSetCollectionValue(String, String, Object, boolean)}.
+ */
+public class SimNodeStateProvider implements NodeStateProvider {
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private final Map<String, Map<String, Object>> nodeValues = new ConcurrentHashMap<>();
+ private final SimClusterStateProvider clusterStateProvider;
+ private final SimDistribStateManager stateManager;
+ private final LiveNodesSet liveNodesSet;
+
+ public SimNodeStateProvider(LiveNodesSet liveNodesSet, SimDistribStateManager stateManager,
+ SimClusterStateProvider clusterStateProvider,
+ Map<String, Map<String, Object>> nodeValues) {
+ this.liveNodesSet = liveNodesSet;
+ this.stateManager = stateManager;
+ this.clusterStateProvider = clusterStateProvider;
+ if (nodeValues != null) {
+ this.nodeValues.putAll(nodeValues);
+ }
+ }
+
+ // -------- simulator setup methods ------------
+
+ /**
+ * Get a node value
+ * @param node node id
+ * @param key property name
+ * @return property value or null if property or node doesn't exist.
+ */
+ public Object simGetNodeValue(String node, String key) {
+ Map<String, Object> values = nodeValues.get(node);
+ if (values == null) {
+ return null;
+ }
+ return values.get(key);
+ }
+
+ /**
+ * Set node values.
+ * NOTE: if values contain 'nodeRole' key then /roles.json is updated.
+ * @param node node id
+ * @param values values.
+ */
+ public void simSetNodeValues(String node, Map<String, Object> values) {
+ Map<String, Object> existing = nodeValues.computeIfAbsent(node, n -> new ConcurrentHashMap<>());
+ existing.clear();
+ if (values != null) {
+ existing.putAll(values);
+ }
+ if (values == null || values.isEmpty() || values.containsKey("nodeRole")) {
+ saveRoles();
+ }
+ }
+
+ /**
+ * Set a node value, replacing any previous value.
+ * NOTE: if key is 'nodeRole' then /roles.json is updated.
+ * @param node node id
+ * @param key property name
+ * @param value property value
+ */
+ public void simSetNodeValue(String node, String key, Object value) {
+ Map<String, Object> existing = nodeValues.computeIfAbsent(node, n -> new ConcurrentHashMap<>());
+ if (value == null) {
+ existing.remove(key);
+ } else {
+ existing.put(key, value);
+ }
+ if (key.equals("nodeRole")) {
+ saveRoles();
+ }
+ }
+
+ /**
+ * Add a node value, creating a list of values if necessary.
+ * NOTE: if key is 'nodeRole' then /roles.json is updated.
+ * @param node node id
+ * @param key property name
+ * @param value property value.
+ */
+ public void simAddNodeValue(String node, String key, Object value) {
+ Map<String, Object> values = nodeValues.computeIfAbsent(node, n -> new ConcurrentHashMap<>());
+ Object existing = values.get(key);
+ if (existing == null) {
+ values.put(key, value);
+ } else if (existing instanceof Set) {
+ ((Set)existing).add(value);
+ } else {
+ Set<Object> vals = new HashSet<>();
+ vals.add(existing);
+ vals.add(value);
+ values.put(key, vals);
+ }
+ if (key.equals("nodeRole")) {
+ saveRoles();
+ }
+ }
+
+ /**
+ * Remove node values. If values contained a 'nodeRole' key then
+ * /roles.json is updated.
+ * @param node node id
+ */
+ public void simRemoveNodeValues(String node) {
+ Map<String, Object> values = nodeValues.remove(node);
+ if (values != null && values.containsKey("nodeRole")) {
+ saveRoles();
+ }
+ }
+
+ /**
+ * Get all node values.
+ */
+ public Map<String, Map<String, Object>> simGetAllNodeValues() {
+ return nodeValues;
+ }
+
+ private synchronized void saveRoles() {
+ final Map<String, Set<String>> roles = new HashMap<>();
+ nodeValues.forEach((n, values) -> {
+ String nodeRole = (String)values.get("nodeRole");
+ if (nodeRole != null) {
+ roles.computeIfAbsent(nodeRole, role -> new HashSet<>()).add(n);
+ }
+ });
+ try {
+ stateManager.setData(ZkStateReader.ROLES, Utils.toJSON(roles), -1);
+ } catch (Exception e) {
+ throw new RuntimeException("Unexpected exception saving roles " + roles, e);
+ }
+ }
+
+ /**
+ * Simulate getting replica metrics values. This uses per-replica properties set in
+ * {@link SimClusterStateProvider#simSetCollectionValue(String, String, Object, boolean)} and
+ * similar methods.
+ * @param node node id
+ * @param tags metrics names
+ * @return map of metrics names / values
+ */
+ public Map<String, Object> getReplicaMetricsValues(String node, Collection<String> tags) {
+ List<ReplicaInfo> replicas = clusterStateProvider.simGetReplicaInfos(node);
+ if (replicas == null || replicas.isEmpty()) {
+ return Collections.emptyMap();
+ }
+ Map<String, Object> values = new HashMap<>();
+ for (String tag : tags) {
+ String[] parts = tag.split(":");
+ if (parts.length < 3 || !parts[0].equals("metrics")) {
+ LOG.warn("Invalid metrics: tag: " + tag);
+ continue;
+ }
+ if (!parts[1].startsWith("solr.core.")) {
+ // skip - this is probably solr.node or solr.jvm metric
+ continue;
+ }
+ String[] collParts = parts[1].substring(10).split("\\.");
+ if (collParts.length != 3) {
+ LOG.warn("Invalid registry name: " + parts[1]);
+ continue;
+ }
+ String collection = collParts[0];
+ String shard = collParts[1];
+ String replica = collParts[2];
+ String key = parts.length > 3 ? parts[2] + ":" + parts[3] : parts[2];
+ replicas.forEach(r -> {
+ if (r.getCollection().equals(collection) && r.getShard().equals(shard) && r.getCore().endsWith(replica)) {
+ Object value = r.getVariables().get(key);
+ if (value != null) {
+ values.put(tag, value);
+ } else {
+ value = r.getVariables().get(tag);
+ if (value != null) {
+ values.put(tag, value);
+ }
+ }
+ }
+ });
+ }
+ return values;
+ }
+
+ // ---------- interface methods -------------
+
+ @Override
+ public Map<String, Object> getNodeValues(String node, Collection<String> tags) {
+ LOG.trace("-- requested values for " + node + ": " + tags);
+ if (!liveNodesSet.contains(node)) {
+ nodeValues.remove(node);
+ return Collections.emptyMap();
+ }
+ if (tags.isEmpty()) {
+ return Collections.emptyMap();
+ }
+ Map<String, Object> result = new HashMap<>();
+ Map<String, Object> metrics = getReplicaMetricsValues(node, tags.stream().filter(s -> s.startsWith("metrics:solr.core.")).collect(Collectors.toList()));
+ result.putAll(metrics);
+ Map<String, Object> values = nodeValues.get(node);
+ if (values == null) {
+ return result;
+ }
+ result.putAll(values.entrySet().stream().filter(e -> tags.contains(e.getKey())).collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue())));
+ return result;
+ }
+
+ @Override
+ public Map<String, Map<String, List<ReplicaInfo>>> getReplicaInfo(String node, Collection<String> keys) {
+ List<ReplicaInfo> replicas = clusterStateProvider.simGetReplicaInfos(node);
+ if (replicas == null || replicas.isEmpty()) {
+ return Collections.emptyMap();
+ }
+ Map<String, Map<String, List<ReplicaInfo>>> res = new HashMap<>();
+ // TODO: probably needs special treatment for "metrics:solr.core..." tags
+ for (ReplicaInfo r : replicas) {
+ Map<String, List<ReplicaInfo>> perCollection = res.computeIfAbsent(r.getCollection(), s -> new HashMap<>());
+ List<ReplicaInfo> perShard = perCollection.computeIfAbsent(r.getShard(), s -> new ArrayList<>());
+ perShard.add(r);
+ }
+ return res;
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d6d2e3b2/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimSolrCloudTestCase.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimSolrCloudTestCase.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimSolrCloudTestCase.java
new file mode 100644
index 0000000..be7209b
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimSolrCloudTestCase.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud.autoscaling.sim;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Predicate;
+
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.CollectionStatePredicate;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.TimeSource;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.util.TimeOut;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.common.cloud.ZkStateReader.SOLR_AUTOSCALING_CONF_PATH;
+
+/**
+ * Base class for simulated test cases. Tests that use this class should configure the simulated cluster
+ * in <code>@BeforeClass</code> like this:
+ * <pre>
+ * @BeforeClass
+ * public static void setupCluster() throws Exception {
+ * cluster = configureCluster(5, TimeSource.get("simTime:50"));
+ * }
+ * </pre>
+ */
+public class SimSolrCloudTestCase extends SolrTestCaseJ4 {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ public static final int DEFAULT_TIMEOUT = 90;
+
+ /** The cluster. */
+ protected static SimCloudManager cluster;
+
+ protected static void configureCluster(int nodeCount, TimeSource timeSource) throws Exception {
+ cluster = SimCloudManager.createCluster(nodeCount, timeSource);
+ }
+
+ @AfterClass
+ public static void shutdownCluster() throws Exception {
+ if (cluster != null) {
+ cluster.close();
+ }
+ cluster = null;
+ }
+
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ if (cluster != null) {
+ // clear any persisted auto scaling configuration
+ cluster.getDistribStateManager().setData(SOLR_AUTOSCALING_CONF_PATH, Utils.toJSON(new ZkNodeProps()), -1);
+ // clean any persisted trigger state or events
+ removeChildren(ZkStateReader.SOLR_AUTOSCALING_EVENTS_PATH);
+ removeChildren(ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH);
+ removeChildren(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH);
+ removeChildren(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH);
+ cluster.getSimClusterStateProvider().simDeleteAllCollections();
+ cluster.simClearSystemCollection();
+ cluster.getSimClusterStateProvider().simResetLeaderThrottle();
+ cluster.simRestartOverseer(null);
+ }
+ }
+
+ @Before
+ public void checkClusterConfiguration() {
+ if (cluster == null)
+ throw new RuntimeException("SimCloudManager not configured - have you called configureCluster()?");
+ }
+
+ protected void removeChildren(String path) throws Exception {
+ if (!cluster.getDistribStateManager().hasData(path)) {
+ return;
+ }
+ List<String> children = cluster.getDistribStateManager().listData(path);
+ for (String c : children) {
+ if (cluster.getDistribStateManager().hasData(path + "/" + c)) {
+ try {
+ cluster.getDistribStateManager().removeData(path + "/" + c, -1);
+ } catch (NoSuchElementException e) {
+ // ignore
+ }
+ }
+ }
+ }
+
+ /* Cluster helper methods ************************************/
+
+ /**
+ * Get the collection state for a particular collection
+ */
+ protected DocCollection getCollectionState(String collectionName) throws IOException {
+ return cluster.getClusterStateProvider().getClusterState().getCollection(collectionName);
+ }
+
+ /**
+ * Wait for a particular collection state to appear in the cluster client's state reader
+ *
+ * This is a convenience method using the {@link #DEFAULT_TIMEOUT}
+ *
+ * @param message a message to report on failure
+ * @param collection the collection to watch
+ * @param predicate a predicate to match against the collection state
+ */
+ protected long waitForState(String message, String collection, CollectionStatePredicate predicate) {
+ AtomicReference<DocCollection> state = new AtomicReference<>();
+ AtomicReference<Set<String>> liveNodesLastSeen = new AtomicReference<>();
+ try {
+ return waitForState(collection, DEFAULT_TIMEOUT, TimeUnit.SECONDS, (n, c) -> {
+ state.set(c);
+ liveNodesLastSeen.set(n);
+ return predicate.matches(n, c);
+ });
+ } catch (Exception e) {
+ throw new AssertionError(message + "\n" + "Live Nodes: " + liveNodesLastSeen.get() + "\nLast available state: " + state.get(), e);
+ }
+ }
+
+ /**
+ * Block until a CollectionStatePredicate returns true, or the wait times out
+ *
+ * Note that the predicate may be called again even after it has returned true, so
+ * implementors should avoid changing state within the predicate call itself.
+ *
+ * @param collection the collection to watch
+ * @param wait how long to wait
+ * @param unit the units of the wait parameter
+ * @param predicate the predicate to call on state changes
+ * @return number of milliseconds elapsed
+ * @throws InterruptedException on interrupt
+ * @throws TimeoutException on timeout
+ * @throws IOException on watcher register / unregister error
+ */
+ public long waitForState(final String collection, long wait, TimeUnit unit, CollectionStatePredicate predicate)
+ throws InterruptedException, TimeoutException, IOException {
+ TimeOut timeout = new TimeOut(wait, unit, cluster.getTimeSource());
+ long timeWarn = timeout.timeLeft(TimeUnit.MILLISECONDS) / 4;
+ while (!timeout.hasTimedOut()) {
+ ClusterState state = cluster.getClusterStateProvider().getClusterState();
+ DocCollection coll = state.getCollectionOrNull(collection);
+ // due to the way we manage collections in SimClusterStateProvider a null here
+ // can mean that a collection is still being created but has no replicas
+ if (coll == null) { // does not yet exist?
+ timeout.sleep(50);
+ continue;
+ }
+ if (predicate.matches(state.getLiveNodes(), coll)) {
+ log.trace("-- predicate matched with state {}", state);
+ return timeout.timeElapsed(TimeUnit.MILLISECONDS);
+ }
+ timeout.sleep(50);
+ if (timeout.timeLeft(TimeUnit.MILLISECONDS) < timeWarn) {
+ log.trace("-- still not matching predicate: {}", state);
+ }
+ }
+ throw new TimeoutException();
+ }
+
+ /**
+ * Return a {@link CollectionStatePredicate} that returns true if a collection has the expected
+ * number of shards and replicas
+ */
+ public static CollectionStatePredicate clusterShape(int expectedShards, int expectedReplicas) {
+ return (liveNodes, collectionState) -> {
+ if (collectionState == null)
+ return false;
+ if (collectionState.getSlices().size() != expectedShards)
+ return false;
+ for (Slice slice : collectionState) {
+ int activeReplicas = 0;
+ for (Replica replica : slice) {
+ if (replica.isActive(liveNodes))
+ activeReplicas++;
+ }
+ if (activeReplicas != expectedReplicas)
+ return false;
+ }
+ return true;
+ };
+ }
+
+ /**
+ * Get a (reproducibly) random shard from a {@link DocCollection}
+ */
+ protected static Slice getRandomShard(DocCollection collection) {
+ List<Slice> shards = new ArrayList<>(collection.getActiveSlices());
+ if (shards.size() == 0)
+ fail("Couldn't get random shard for collection as it has no shards!\n" + collection.toString());
+ Collections.shuffle(shards, random());
+ return shards.get(0);
+ }
+
+ /**
+ * Get a (reproducibly) random replica from a {@link Slice}
+ */
+ protected static Replica getRandomReplica(Slice slice) {
+ List<Replica> replicas = new ArrayList<>(slice.getReplicas());
+ if (replicas.size() == 0)
+ fail("Couldn't get random replica from shard as it has no replicas!\n" + slice.toString());
+ Collections.shuffle(replicas, random());
+ return replicas.get(0);
+ }
+
+ /**
+ * Get a (reproducibly) random replica from a {@link Slice} matching a predicate
+ */
+ protected static Replica getRandomReplica(Slice slice, Predicate<Replica> matchPredicate) {
+ List<Replica> replicas = new ArrayList<>(slice.getReplicas());
+ if (replicas.size() == 0)
+ fail("Couldn't get random replica from shard as it has no replicas!\n" + slice.toString());
+ Collections.shuffle(replicas, random());
+ for (Replica replica : replicas) {
+ if (matchPredicate.test(replica))
+ return replica;
+ }
+ fail("Couldn't get random replica that matched conditions\n" + slice.toString());
+ return null; // just to keep the compiler happy - fail will always throw an Exception
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d6d2e3b2/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestClusterStateProvider.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestClusterStateProvider.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestClusterStateProvider.java
new file mode 100644
index 0000000..396edea
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestClusterStateProvider.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud.autoscaling.sim;
+
+import java.lang.invoke.MethodHandles;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
+import org.apache.solr.client.solrj.cloud.autoscaling.Preference;
+import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
+import org.apache.solr.client.solrj.cloud.autoscaling.SolrCloudManager;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.cloud.rule.ImplicitSnitch;
+import org.apache.solr.common.params.CollectionAdminParams;
+import org.apache.solr.common.util.TimeSource;
+import org.apache.solr.common.util.Utils;
+import org.apache.zookeeper.Watcher;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This test compares the cluster state of a real cluster and a simulated one.
+ */
+public class TestClusterStateProvider extends SolrCloudTestCase {
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private static int NODE_COUNT = 3;
+ private static boolean simulated;
+
+ private static SolrCloudManager cloudManager;
+
+ private static Collection<String> liveNodes;
+ private static Map<String, Object> clusterProperties;
+ private static AutoScalingConfig autoScalingConfig;
+ private static Map<String, Map<String, Map<String, List<ReplicaInfo>>>> replicas;
+ private static Map<String, Map<String, Object>> nodeValues;
+ private static ClusterState realState;
+
+ // set up a real cluster as the source of test data
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ simulated = random().nextBoolean();
+ LOG.info("####### Using simulated components? " + simulated);
+
+ configureCluster(NODE_COUNT)
+ .addConfig("conf", configset("cloud-minimal"))
+ .configure();
+ CollectionAdminRequest.createCollection(CollectionAdminParams.SYSTEM_COLL, null, 1, 2, 0, 1)
+ .process(cluster.getSolrClient());
+ init();
+ }
+
+ @AfterClass
+ public static void closeCloudManager() throws Exception {
+ if (simulated && cloudManager != null) {
+ cloudManager.close();
+ }
+ }
+
+ private static void init() throws Exception {
+ SolrCloudManager realManager = cluster.getJettySolrRunner(cluster.getJettySolrRunners().size() - 1).getCoreContainer()
+ .getZkController().getSolrCloudManager();
+ liveNodes = realManager.getClusterStateProvider().getLiveNodes();
+ clusterProperties = realManager.getClusterStateProvider().getClusterProperties();
+ autoScalingConfig = realManager.getDistribStateManager().getAutoScalingConfig();
+ replicas = new HashMap<>();
+ nodeValues = new HashMap<>();
+ liveNodes.forEach(n -> {
+ replicas.put(n, realManager.getNodeStateProvider().getReplicaInfo(n, Collections.emptySet()));
+ nodeValues.put(n, realManager.getNodeStateProvider().getNodeValues(n, ImplicitSnitch.tags));
+ });
+ realState = realManager.getClusterStateProvider().getClusterState();
+
+ if (simulated) {
+ // initialize simulated provider
+ SimCloudManager simCloudManager = new SimCloudManager(TimeSource.get("simTime:10"));
+ simCloudManager.getSimClusterStateProvider().simSetClusterProperties(clusterProperties);
+ simCloudManager.getSimDistribStateManager().simSetAutoScalingConfig(autoScalingConfig);
+ nodeValues.forEach((n, values) -> {
+ simCloudManager.getSimNodeStateProvider().simSetNodeValues(n, values);
+ });
+ simCloudManager.getSimClusterStateProvider().simSetClusterState(realState);
+ ClusterState simState = simCloudManager.getClusterStateProvider().getClusterState();
+ assertClusterStateEquals(realState, simState);
+ cloudManager = simCloudManager;
+ } else {
+ cloudManager = realManager;
+ }
+ }
+
+ private static void assertClusterStateEquals(ClusterState one, ClusterState two) {
+ assertEquals(one.getLiveNodes(), two.getLiveNodes());
+ assertEquals(one.getCollectionsMap().keySet(), two.getCollectionsMap().keySet());
+ one.forEachCollection(oneColl -> {
+ DocCollection twoColl = two.getCollection(oneColl.getName());
+ Map<String, Slice> oneSlices = oneColl.getSlicesMap();
+ Map<String, Slice> twoSlices = twoColl.getSlicesMap();
+ assertEquals(oneSlices.keySet(), twoSlices.keySet());
+ oneSlices.forEach((s, slice) -> {
+ Slice sTwo = twoSlices.get(s);
+ for (Replica oneReplica : slice.getReplicas()) {
+ Replica twoReplica = sTwo.getReplica(oneReplica.getName());
+ assertNotNull(twoReplica);
+ assertEquals(oneReplica, twoReplica);
+ }
+ });
+ });
+ }
+
+ private String addNode() throws Exception {
+ JettySolrRunner solr = cluster.startJettySolrRunner();
+ String nodeId = solr.getNodeName();
+ if (simulated) {
+ ((SimCloudManager) cloudManager).getSimClusterStateProvider().simAddNode(nodeId);
+ }
+ return nodeId;
+ }
+
+ private String deleteNode() throws Exception {
+ String nodeId = cluster.getJettySolrRunner(0).getNodeName();
+ cluster.stopJettySolrRunner(0);
+ if (simulated) {
+ ((SimCloudManager) cloudManager).getSimClusterStateProvider().simRemoveNode(nodeId);
+ }
+ return nodeId;
+ }
+
+ private void setAutoScalingConfig(AutoScalingConfig cfg) throws Exception {
+ cluster.getJettySolrRunner(0).getCoreContainer().getZkController().getZkClient().setData(ZkStateReader.SOLR_AUTOSCALING_CONF_PATH,
+ Utils.toJSON(cfg), -1, true);
+ if (simulated) {
+ ((SimCloudManager) cloudManager).getSimDistribStateManager().simSetAutoScalingConfig(cfg);
+ }
+ }
+
+ @Test
+ public void testAddRemoveNode() throws Exception {
+ Set<String> lastNodes = new HashSet<>(cloudManager.getClusterStateProvider().getLiveNodes());
+ List<String> liveNodes = cloudManager.getDistribStateManager().listData(ZkStateReader.LIVE_NODES_ZKNODE);
+ assertEquals(lastNodes.size(), liveNodes.size());
+ liveNodes.removeAll(lastNodes);
+ assertTrue(liveNodes.isEmpty());
+
+ String node = addNode();
+ cloudManager.getTimeSource().sleep(2000);
+ assertFalse(lastNodes.contains(node));
+ lastNodes = new HashSet<>(cloudManager.getClusterStateProvider().getLiveNodes());
+ assertTrue(lastNodes.contains(node));
+ liveNodes = cloudManager.getDistribStateManager().listData(ZkStateReader.LIVE_NODES_ZKNODE);
+ assertEquals(lastNodes.size(), liveNodes.size());
+ liveNodes.removeAll(lastNodes);
+ assertTrue(liveNodes.isEmpty());
+
+ node = deleteNode();
+ cloudManager.getTimeSource().sleep(2000);
+ assertTrue(lastNodes.contains(node));
+ lastNodes = new HashSet<>(cloudManager.getClusterStateProvider().getLiveNodes());
+ assertFalse(lastNodes.contains(node));
+ liveNodes = cloudManager.getDistribStateManager().listData(ZkStateReader.LIVE_NODES_ZKNODE);
+ assertEquals(lastNodes.size(), liveNodes.size());
+ liveNodes.removeAll(lastNodes);
+ assertTrue(liveNodes.isEmpty()); }
+
+ @Test
+ public void testAutoScalingConfig() throws Exception {
+ final CountDownLatch triggered = new CountDownLatch(1);
+ Watcher w = ev -> {
+ if (triggered.getCount() == 0) {
+ fail("already triggered once!");
+ }
+ triggered.countDown();
+ };
+ AutoScalingConfig cfg = cloudManager.getDistribStateManager().getAutoScalingConfig(w);
+ assertEquals(autoScalingConfig, cfg);
+ Preference p = new Preference(Collections.singletonMap("maximize", "freedisk"));
+ cfg = cfg.withPolicy(cfg.getPolicy().withClusterPreferences(Collections.singletonList(p)));
+ setAutoScalingConfig(cfg);
+ if (!triggered.await(10, TimeUnit.SECONDS)) {
+ fail("Watch should be triggered on update!");
+ }
+ AutoScalingConfig cfg1 = cloudManager.getDistribStateManager().getAutoScalingConfig(null);
+ assertEquals(cfg, cfg1);
+
+ // restore
+ setAutoScalingConfig(autoScalingConfig);
+ cfg1 = cloudManager.getDistribStateManager().getAutoScalingConfig(null);
+ assertEquals(autoScalingConfig, cfg1);
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d6d2e3b2/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestComputePlanAction.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestComputePlanAction.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestComputePlanAction.java
new file mode 100644
index 0000000..b7053d7d
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestComputePlanAction.java
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.autoscaling.sim;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.SolrResponse;
+import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.cloud.autoscaling.ActionContext;
+import org.apache.solr.cloud.autoscaling.ComputePlanAction;
+import org.apache.solr.cloud.autoscaling.ScheduledTriggers;
+import org.apache.solr.cloud.autoscaling.TriggerAction;
+import org.apache.solr.cloud.autoscaling.TriggerEvent;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.params.CollectionParams;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.TimeSource;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.util.LogLevel;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.cloud.autoscaling.AutoScalingHandlerTest.createAutoScalingRequest;
+import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOVEREPLICA;
+
+/**
+ * Test for {@link ComputePlanAction}
+ */
+@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG;org.apache.solr.cloud.Overseer=DEBUG;org.apache.solr.cloud.overseer=DEBUG;org.apache.solr.client.solrj.cloud.autoscaling=DEBUG;")
+public class TestComputePlanAction extends SimSolrCloudTestCase {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private static final AtomicBoolean fired = new AtomicBoolean(false);
+ private static final int NODE_COUNT = 1;
+ private static CountDownLatch triggerFiredLatch = new CountDownLatch(1);
+ private static final AtomicReference<Map> actionContextPropsRef = new AtomicReference<>();
+ private static final AtomicReference<TriggerEvent> eventRef = new AtomicReference<>();
+
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ configureCluster(1, TimeSource.get("simTime:50"));
+ }
+
+ @Before
+ public void init() throws Exception {
+
+ fired.set(false);
+ triggerFiredLatch = new CountDownLatch(1);
+ actionContextPropsRef.set(null);
+
+ if (cluster.getClusterStateProvider().getLiveNodes().size() > NODE_COUNT) {
+ // stop some to get to original state
+ int numJetties = cluster.getClusterStateProvider().getLiveNodes().size();
+ for (int i = 0; i < numJetties - NODE_COUNT; i++) {
+ String node = cluster.getSimClusterStateProvider().simGetRandomNode(random());
+ cluster.getSimClusterStateProvider().simRemoveNode(node);
+ }
+ }
+
+ String setClusterPolicyCommand = "{" +
+ " 'set-cluster-policy': [" +
+ " {'cores':'<10', 'node':'#ANY'}," +
+ " {'replica':'<2', 'shard': '#EACH', 'node': '#ANY'}," +
+ " {'nodeRole':'overseer', 'replica':0}" +
+ " ]" +
+ "}";
+ SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setClusterPolicyCommand);
+ SolrResponse rsp = cluster.request(req);
+ NamedList<Object> response = rsp.getResponse();
+ assertEquals(response.get("result").toString(), "success");
+
+ String setClusterPreferencesCommand = "{" +
+ "'set-cluster-preferences': [" +
+ "{'minimize': 'cores'}," +
+ "{'maximize': 'freedisk','precision': 100}]" +
+ "}";
+ req = createAutoScalingRequest(SolrRequest.METHOD.POST, setClusterPreferencesCommand);
+ rsp = cluster.request(req);
+ response = rsp.getResponse();
+ assertEquals(response.get("result").toString(), "success");
+ cluster.getTimeSource().sleep(TimeUnit.SECONDS.toMillis(ScheduledTriggers.DEFAULT_COOLDOWN_PERIOD_SECONDS));
+ }
+
+ @After
+ public void printState() throws Exception {
+ log.info("-------------_ FINAL STATE --------------");
+ log.info("* Node values: " + Utils.toJSONString(cluster.getSimNodeStateProvider().simGetAllNodeValues()));
+ log.info("* Live nodes: " + cluster.getClusterStateProvider().getLiveNodes());
+ ClusterState state = cluster.getClusterStateProvider().getClusterState();
+ for (String coll: cluster.getSimClusterStateProvider().simListCollections()) {
+ log.info("* Collection " + coll + " state: " + state.getCollection(coll));
+ }
+
+ }
+
+ @Test
+ public void testNodeLost() throws Exception {
+ // let's start a node so that we have at least two
+ String node = cluster.simAddNode();
+ AssertingTriggerAction.expectedNode = node;
+
+ SolrClient solrClient = cluster.simGetSolrClient();
+ String setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'node_lost_trigger'," +
+ "'event' : 'nodeLost'," +
+ "'waitFor' : '7s'," +
+ "'enabled' : true," +
+ "'actions' : [{'name':'compute_plan', 'class' : 'solr.ComputePlanAction'}," +
+ "{'name':'test','class':'" + TestComputePlanAction.AssertingTriggerAction.class.getName() + "'}]" +
+ "}}";
+ SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+ NamedList<Object> response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection("testNodeLost",
+ "conf",1, 2);
+ create.process(solrClient);
+
+ waitForState("Timed out waiting for replicas of new collection to be active",
+ "testNodeLost", clusterShape(1, 2));
+
+ ClusterState clusterState = cluster.getClusterStateProvider().getClusterState();
+ log.debug("-- cluster state: {}", clusterState);
+ DocCollection collection = clusterState.getCollection("testNodeLost");
+ List<Replica> replicas = collection.getReplicas(node);
+ assertNotNull(replicas);
+ assertFalse(replicas.isEmpty());
+
+ // start another node because because when the other node goes away, the cluster policy requires only
+ // 1 replica per node and none on the overseer
+ String node2 = cluster.simAddNode();
+ assertTrue(node2 + "is not live yet", cluster.getClusterStateProvider().getClusterState().liveNodesContain(node2) );
+
+ // stop the original node
+ cluster.simRemoveNode(node, false);
+ log.info("Stopped_node : {}", node);
+
+ assertTrue("Trigger was not fired even after 10 seconds", triggerFiredLatch.await(10, TimeUnit.SECONDS));
+ assertTrue(fired.get());
+ Map context = actionContextPropsRef.get();
+ assertNotNull(context);
+ List<SolrRequest> operations = (List<SolrRequest>) context.get("operations");
+ assertNotNull("The operations computed by ComputePlanAction should not be null , " + eventRef.get(), operations);
+ assertEquals("ComputePlanAction should have computed exactly 1 operation", 1, operations.size());
+ SolrRequest solrRequest = operations.get(0);
+ SolrParams params = solrRequest.getParams();
+ assertEquals("Expected MOVEREPLICA action after adding node", MOVEREPLICA, CollectionParams.CollectionAction.get(params.get("action")));
+ String replicaToBeMoved = params.get("replica");
+ assertEquals("Unexpected node in computed operation", replicas.get(0).getName(), replicaToBeMoved);
+
+ // shutdown the extra node that we had started
+ cluster.simRemoveNode(node2, false);
+ }
+
+ public void testNodeWithMultipleReplicasLost() throws Exception {
+ AssertingTriggerAction.expectedNode = null;
+
+ // start 3 more nodes
+ cluster.simAddNode();
+ cluster.simAddNode();
+ cluster.simAddNode();
+
+ SolrClient solrClient = cluster.simGetSolrClient();
+ String setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'node_lost_trigger'," +
+ "'event' : 'nodeLost'," +
+ "'waitFor' : '1s'," +
+ "'enabled' : true," +
+ "'actions' : [{'name':'compute_plan', 'class' : 'solr.ComputePlanAction'}," +
+ "{'name':'test','class':'" + TestComputePlanAction.AssertingTriggerAction.class.getName() + "'}]" +
+ "}}";
+ SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+ NamedList<Object> response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection("testNodeWithMultipleReplicasLost",
+ "conf",2, 3);
+// create.setMaxShardsPerNode(2);
+ create.process(solrClient);
+
+ waitForState("Timed out waiting for replicas of new collection to be active",
+ "testNodeWithMultipleReplicasLost", clusterShape(2, 3));
+
+ ClusterState clusterState = cluster.getClusterStateProvider().getClusterState();
+ log.debug("-- cluster state: {}", clusterState);
+ DocCollection docCollection = clusterState.getCollection("testNodeWithMultipleReplicasLost");
+
+ // lets find a node with at least 2 replicas
+ String stoppedNodeName = null;
+ List<Replica> replicasToBeMoved = null;
+ for (String node : cluster.getClusterStateProvider().getLiveNodes()) {
+ List<Replica> replicas = docCollection.getReplicas(node);
+ if (replicas != null && replicas.size() == 2) {
+ stoppedNodeName = node;
+ replicasToBeMoved = replicas;
+ cluster.simRemoveNode(node, false);
+ break;
+ }
+ }
+ assertNotNull(stoppedNodeName);
+
+ assertTrue("Trigger was not fired even after 5 seconds", triggerFiredLatch.await(5, TimeUnit.SECONDS));
+ assertTrue(fired.get());
+
+ TriggerEvent triggerEvent = eventRef.get();
+ assertNotNull(triggerEvent);
+ assertEquals(TriggerEventType.NODELOST, triggerEvent.getEventType());
+ // TODO assertEquals(stoppedNodeName, triggerEvent.getProperty(TriggerEvent.NODE_NAME));
+
+ Map context = actionContextPropsRef.get();
+ assertNotNull(context);
+ List<SolrRequest> operations = (List<SolrRequest>) context.get("operations");
+ assertNotNull("The operations computed by ComputePlanAction should not be null " + actionContextPropsRef.get(), operations);
+ operations.forEach(solrRequest -> log.info(solrRequest.getParams().toString()));
+ assertEquals("ComputePlanAction should have computed exactly 2 operation", 2, operations.size());
+
+ for (SolrRequest solrRequest : operations) {
+ SolrParams params = solrRequest.getParams();
+ assertEquals("Expected MOVEREPLICA action after adding node", MOVEREPLICA, CollectionParams.CollectionAction.get(params.get("action")));
+ String moved = params.get("replica");
+ assertTrue(replicasToBeMoved.stream().anyMatch(replica -> replica.getName().equals(moved)));
+ }
+ }
+
+ @Test
+ public void testNodeAdded() throws Exception {
+ AssertingTriggerAction.expectedNode = null;
+ SolrClient solrClient = cluster.simGetSolrClient();
+ String setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'node_added_trigger'," +
+ "'event' : 'nodeAdded'," +
+ "'waitFor' : '1s'," +
+ "'enabled' : true," +
+ "'actions' : [{'name':'compute_plan', 'class' : 'solr.ComputePlanAction'}," +
+ "{'name':'test','class':'" + TestComputePlanAction.AssertingTriggerAction.class.getName() + "'}]" +
+ "}}";
+ SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+ NamedList<Object> response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ // the default policy limits 1 replica per node, we need more right now
+ String setClusterPolicyCommand = "{" +
+ " 'set-cluster-policy': [" +
+ " {'cores':'<10', 'node':'#ANY'}," +
+ " {'replica':'<3', 'shard': '#EACH', 'node': '#ANY'}," +
+ " {'nodeRole':'overseer', 'replica':0}" +
+ " ]" +
+ "}";
+ req = createAutoScalingRequest(SolrRequest.METHOD.POST, setClusterPolicyCommand);
+ response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection("testNodeAdded",
+ "conf",1, 2);
+ create.process(solrClient);
+
+ waitForState("Timed out waiting for replicas of new collection to be active",
+ "testNodeAdded", (liveNodes, collectionState) -> collectionState.getReplicas().stream().allMatch(replica -> replica.isActive(liveNodes)));
+
+ // reset to the original policy which has only 1 replica per shard per node
+ setClusterPolicyCommand = "{" +
+ " 'set-cluster-policy': [" +
+ " {'cores':'<10', 'node':'#ANY'}," +
+ " {'replica':'<2', 'shard': '#EACH', 'node': '#ANY'}," +
+ " {'nodeRole':'overseer', 'replica':0}" +
+ " ]" +
+ "}";
+ req = createAutoScalingRequest(SolrRequest.METHOD.POST, setClusterPolicyCommand);
+ response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ // start a node so that the 'violation' created by the previous policy update is fixed
+ String newNode = cluster.simAddNode();
+ assertTrue("Trigger was not fired even after 5 seconds", triggerFiredLatch.await(5, TimeUnit.SECONDS));
+ assertTrue(fired.get());
+ Map context = actionContextPropsRef.get();
+ assertNotNull(context);
+ log.info("Node values: " + Utils.toJSONString(cluster.getSimNodeStateProvider().simGetAllNodeValues()));
+ log.info("Live nodes: " + cluster.getClusterStateProvider().getLiveNodes() + ", collection state: " + cluster.getClusterStateProvider().getClusterState().getCollection("testNodeAdded"));
+ List<SolrRequest> operations = (List<SolrRequest>) context.get("operations");
+ assertNotNull("The operations computed by ComputePlanAction should not be null" + context, operations);
+ assertEquals("ComputePlanAction should have computed exactly 1 operation, but was: " + operations, 1, operations.size());
+ SolrRequest request = operations.get(0);
+ SolrParams params = request.getParams();
+ assertEquals("Expected MOVEREPLICA action after adding node", MOVEREPLICA, CollectionParams.CollectionAction.get(params.get("action")));
+ String nodeAdded = params.get("targetNode");
+ assertEquals("Unexpected node in computed operation", newNode, nodeAdded);
+ }
+
+ public static class AssertingTriggerAction implements TriggerAction {
+ static String expectedNode;
+
+ @Override
+ public String getName() {
+ return null;
+ }
+
+ @Override
+ public void process(TriggerEvent event, ActionContext context) {
+ if (expectedNode != null) {
+ Collection nodes = (Collection) event.getProperty(TriggerEvent.NODE_NAMES);
+ if (nodes == null || !nodes.contains(expectedNode)) return;//this is not the event we are looking for
+ }
+ if (fired.compareAndSet(false, true)) {
+ eventRef.set(event);
+ actionContextPropsRef.set(context.getProperties());
+ triggerFiredLatch.countDown();
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+
+ @Override
+ public void init(Map<String, String> args) {
+
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d6d2e3b2/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestDistribStateManager.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestDistribStateManager.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestDistribStateManager.java
new file mode 100644
index 0000000..a9c5140
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestDistribStateManager.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.autoscaling.sim;
+
+import java.lang.invoke.MethodHandles;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Locale;
+import java.util.NoSuchElementException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.client.solrj.cloud.autoscaling.AlreadyExistsException;
+import org.apache.solr.client.solrj.cloud.autoscaling.BadVersionException;
+import org.apache.solr.client.solrj.cloud.autoscaling.DistribStateManager;
+import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData;
+import org.apache.solr.client.solrj.impl.ZkDistribStateManager;
+import org.apache.solr.cloud.ZkTestServer;
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This test compares a ZK-based {@link DistribStateManager} to the simulated one.
+ */
+public class TestDistribStateManager extends SolrTestCaseJ4 {
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private DistribStateManager stateManager;
+ private ZkTestServer zkTestServer;
+ private SolrZkClient solrZkClient;
+ private boolean simulated;
+ private SimDistribStateManager.Node root;
+
+ @Before
+ public void setup() throws Exception {
+ simulated = random().nextBoolean();
+ if (simulated) {
+ root = SimDistribStateManager.createNewRootNode();
+ } else {
+ zkTestServer = new ZkTestServer(createTempDir("zkDir").toString());
+ zkTestServer.run();
+ }
+ reInit();
+ }
+
+ private void reInit() throws Exception {
+ if (stateManager != null) {
+ stateManager.close();
+ }
+ if (simulated) {
+ stateManager = new SimDistribStateManager(root);
+ } else {
+ if (solrZkClient != null) {
+ solrZkClient.close();
+ }
+ solrZkClient = new SolrZkClient(zkTestServer.getZkHost(), 30000);
+ stateManager = new ZkDistribStateManager(solrZkClient);
+ }
+ LOG.info("Using " + stateManager.getClass().getName());
+ }
+
+ @After
+ public void teardown() throws Exception {
+ if (solrZkClient != null) {
+ solrZkClient.close();
+ solrZkClient = null;
+ }
+ if (zkTestServer != null) {
+ zkTestServer.shutdown();
+ zkTestServer = null;
+ }
+ if (stateManager != null) {
+ stateManager.close();
+ }
+ stateManager = null;
+ }
+
+ @Test
+ public void testHasData() throws Exception {
+ assertFalse(stateManager.hasData("/hasData/foo"));
+ assertFalse(stateManager.hasData("/hasData/bar"));
+ try {
+ stateManager.createData("/hasData/foo", new byte[0], CreateMode.PERSISTENT);
+ } catch (NoSuchElementException e) {
+ // expected
+ }
+ stateManager.makePath("/hasData");
+ stateManager.createData("/hasData/foo", new byte[0], CreateMode.PERSISTENT);
+ stateManager.createData("/hasData/bar", new byte[0], CreateMode.PERSISTENT);
+ assertTrue(stateManager.hasData("/hasData/foo"));
+ assertTrue(stateManager.hasData("/hasData/bar"));
+ }
+
+ @Test
+ public void testListData() throws Exception {
+ assertFalse(stateManager.hasData("/listData/foo"));
+ assertFalse(stateManager.hasData("/listData/foo/bar"));
+ try {
+ stateManager.createData("/listData/foo/bar", new byte[0], CreateMode.PERSISTENT);
+ fail("should not succeed");
+ } catch (NoSuchElementException e) {
+ // expected
+ }
+ try {
+ stateManager.listData("/listData/foo");
+ fail("should not succeed");
+ } catch (NoSuchElementException e) {
+ // expected
+ }
+ stateManager.makePath("/listData");
+ List<String> kids = stateManager.listData("/listData");
+ assertEquals(0, kids.size());
+ stateManager.makePath("/listData/foo");
+ kids = stateManager.listData("/listData");
+ assertEquals(1, kids.size());
+ assertEquals("foo", kids.get(0));
+ stateManager.createData("/listData/foo/bar", new byte[0], CreateMode.PERSISTENT);
+ stateManager.createData("/listData/foo/baz", new byte[0], CreateMode.PERSISTENT);
+ kids = stateManager.listData("/listData/foo");
+ assertEquals(2, kids.size());
+ assertTrue(kids.contains("bar"));
+ assertTrue(kids.contains("baz"));
+ try {
+ stateManager.createData("/listData/foo/bar", new byte[0], CreateMode.PERSISTENT);
+ fail("should not succeed");
+ } catch (AlreadyExistsException e) {
+ // expected
+ }
+ }
+
+ static final byte[] firstData = new byte[] {
+ (byte)0xca, (byte)0xfe, (byte)0xba, (byte)0xbe
+ };
+
+ static final byte[] secondData = new byte[] {
+ (byte)0xbe, (byte)0xba, (byte)0xfe, (byte)0xca
+ };
+
+ @Test
+ public void testCreateMode() throws Exception {
+ stateManager.makePath("/createMode");
+ stateManager.createData("/createMode/persistent", firstData, CreateMode.PERSISTENT);
+ stateManager.createData("/createMode/persistent_seq", firstData, CreateMode.PERSISTENT);
+ for (int i = 0; i < 10; i++) {
+ stateManager.createData("/createMode/persistent_seq/data", firstData, CreateMode.PERSISTENT_SEQUENTIAL);
+ }
+ // check what happens with gaps
+ stateManager.createData("/createMode/persistent_seq/data", firstData, CreateMode.PERSISTENT_SEQUENTIAL);
+ stateManager.removeData("/createMode/persistent_seq/data" + String.format(Locale.ROOT, "%010d", 10), -1);
+ stateManager.createData("/createMode/persistent_seq/data", firstData, CreateMode.PERSISTENT_SEQUENTIAL);
+
+ stateManager.createData("/createMode/ephemeral", firstData, CreateMode.EPHEMERAL);
+ stateManager.createData("/createMode/ephemeral_seq", firstData, CreateMode.PERSISTENT);
+ for (int i = 0; i < 10; i++) {
+ stateManager.createData("/createMode/ephemeral_seq/data", firstData, CreateMode.EPHEMERAL_SEQUENTIAL);
+ }
+ assertTrue(stateManager.hasData("/createMode"));
+ assertTrue(stateManager.hasData("/createMode/persistent"));
+ assertTrue(stateManager.hasData("/createMode/ephemeral"));
+ List<String> kids = stateManager.listData("/createMode/persistent_seq");
+ assertEquals(11, kids.size());
+ kids = stateManager.listData("/createMode/ephemeral_seq");
+ assertEquals(10, kids.size());
+ for (int i = 0; i < 10; i++) {
+ assertTrue(stateManager.hasData("/createMode/persistent_seq/data" + String.format(Locale.ROOT, "%010d", i)));
+ }
+ assertFalse(stateManager.hasData("/createMode/persistent_seq/data" + String.format(Locale.ROOT, "%010d", 10)));
+ assertTrue(stateManager.hasData("/createMode/persistent_seq/data" + String.format(Locale.ROOT, "%010d", 11)));
+
+ for (int i = 0; i < 10; i++) {
+ assertTrue(stateManager.hasData("/createMode/ephemeral_seq/data" + String.format(Locale.ROOT, "%010d", i)));
+ }
+ // check that ephemeral nodes disappear on disconnect
+ reInit();
+ assertTrue(stateManager.hasData("/createMode/persistent"));
+ for (int i = 0; i < 10; i++) {
+ assertTrue(stateManager.hasData("/createMode/persistent_seq/data" + String.format(Locale.ROOT, "%010d", i)));
+ }
+ assertTrue(stateManager.hasData("/createMode/persistent_seq/data" + String.format(Locale.ROOT, "%010d", 11)));
+
+ assertFalse(stateManager.hasData("/createMode/ephemeral"));
+ assertTrue(stateManager.hasData("/createMode/ephemeral_seq"));
+ kids = stateManager.listData("/createMode/ephemeral_seq");
+ assertEquals(0, kids.size());
+ }
+
+ static class OnceWatcher implements Watcher {
+ CountDownLatch triggered = new CountDownLatch(1);
+ WatchedEvent event;
+
+ @Override
+ public void process(WatchedEvent event) {
+ if (triggered.getCount() == 0) {
+ fail("Watch was already triggered once!");
+ }
+ triggered.countDown();
+ this.event = event;
+ }
+ }
+
+ @Test
+ public void testGetSetRemoveData() throws Exception {
+ stateManager.makePath("/getData");
+ stateManager.createData("/getData/persistentData", firstData, CreateMode.PERSISTENT);
+ OnceWatcher nodeWatcher = new OnceWatcher();
+ VersionedData vd = stateManager.getData("/getData/persistentData", nodeWatcher);
+ assertNotNull(vd);
+ assertEquals(0, vd.getVersion());
+ assertTrue(Arrays.equals(firstData, vd.getData()));
+
+ // update data, test versioning
+ try {
+ stateManager.setData("/getData/persistentData", secondData, 1);
+ fail("should have failed");
+ } catch (BadVersionException e) {
+ // expected
+ }
+ // watch should not have fired
+ assertEquals(1, nodeWatcher.triggered.getCount());
+
+ stateManager.setData("/getData/persistentData", secondData, 0);
+ if (!nodeWatcher.triggered.await(5, TimeUnit.SECONDS)) {
+ fail("Node watch should have fired!");
+ }
+ // watch should not fire now because it needs to be reset
+ stateManager.setData("/getData/persistentData", secondData, -1);
+
+ nodeWatcher = new OnceWatcher();
+ stateManager.createData("/getData/ephemeralData", firstData, CreateMode.EPHEMERAL);
+ vd = stateManager.getData("/getData/ephemeralData", nodeWatcher);
+ reInit();
+ if (!nodeWatcher.triggered.await(5, TimeUnit.SECONDS)) {
+ fail("Node watch should have fired!");
+ }
+ assertTrue(stateManager.hasData("/getData/persistentData"));
+ assertFalse(stateManager.hasData("/getData/ephemeralData"));
+
+ nodeWatcher = new OnceWatcher();
+ vd = stateManager.getData("/getData/persistentData", nodeWatcher);
+ // try wrong version
+ try {
+ stateManager.removeData("/getData/persistentData", vd.getVersion() - 1);
+ fail("should have failed");
+ } catch (BadVersionException e) {
+ // expected
+ }
+ // watch should not have fired
+ assertEquals(1, nodeWatcher.triggered.getCount());
+
+ stateManager.removeData("/getData/persistentData", vd.getVersion());
+ if (!nodeWatcher.triggered.await(5, TimeUnit.SECONDS)) {
+ fail("Node watch should have fired!");
+ }
+ }
+
+ @Test
+ public void testMulti() throws Exception {
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d6d2e3b2/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestExecutePlanAction.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestExecutePlanAction.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestExecutePlanAction.java
new file mode 100644
index 0000000..18d76dc
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestExecutePlanAction.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.autoscaling.sim;
+
+import java.lang.invoke.MethodHandles;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.Lists;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
+import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.cloud.autoscaling.ActionContext;
+import org.apache.solr.cloud.autoscaling.AutoScalingHandlerTest;
+import org.apache.solr.cloud.autoscaling.ExecutePlanAction;
+import org.apache.solr.cloud.autoscaling.NodeLostTrigger;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CollectionParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.util.LogLevel;
+import org.apache.solr.common.util.TimeSource;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test for {@link ExecutePlanAction}
+ */
+@LogLevel("org.apache.solr.cloud=DEBUG")
+public class TestExecutePlanAction extends SimSolrCloudTestCase {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private static final int NODE_COUNT = 2;
+
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ configureCluster(NODE_COUNT, TimeSource.get("simTime:50"));
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+
+ if (cluster.getClusterStateProvider().getLiveNodes().size() < NODE_COUNT) {
+ // start some to get to original state
+ int numJetties = cluster.getClusterStateProvider().getLiveNodes().size();
+ for (int i = 0; i < NODE_COUNT - numJetties; i++) {
+ cluster.simAddNode();
+ }
+ }
+ }
+
+ @After
+ public void printState() throws Exception {
+ log.info("-------------_ FINAL STATE --------------");
+ log.info("* Node values: " + Utils.toJSONString(cluster.getSimNodeStateProvider().simGetAllNodeValues()));
+ log.info("* Live nodes: " + cluster.getClusterStateProvider().getLiveNodes());
+ ClusterState state = cluster.getClusterStateProvider().getClusterState();
+ for (String coll: cluster.getSimClusterStateProvider().simListCollections()) {
+ log.info("* Collection " + coll + " state: " + state.getCollection(coll));
+ }
+
+ }
+
+ @Test
+ public void testExecute() throws Exception {
+ SolrClient solrClient = cluster.simGetSolrClient();
+ String collectionName = "testExecute";
+ CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName,
+ "conf", 1, 2);
+ create.setMaxShardsPerNode(1);
+ create.process(solrClient);
+
+ log.info("Collection ready after " + waitForState(collectionName, 120, TimeUnit.SECONDS, clusterShape(1, 2)) + "ms");
+
+ String sourceNodeName = cluster.getSimClusterStateProvider().simGetRandomNode(random());
+ ClusterState clusterState = cluster.getClusterStateProvider().getClusterState();
+ DocCollection docCollection = clusterState.getCollection(collectionName);
+ List<Replica> replicas = docCollection.getReplicas(sourceNodeName);
+ assertNotNull(replicas);
+ assertFalse(replicas.isEmpty());
+
+ List<String> otherNodes = cluster.getClusterStateProvider().getLiveNodes().stream()
+ .filter(node -> !node.equals(sourceNodeName)).collect(Collectors.toList());
+ assertFalse(otherNodes.isEmpty());
+ String survivor = otherNodes.get(0);
+
+ try (ExecutePlanAction action = new ExecutePlanAction()) {
+ action.init(Collections.singletonMap("name", "execute_plan"));
+
+ // used to signal if we found that ExecutePlanAction did in fact create the right znode before executing the operation
+ AtomicBoolean znodeCreated = new AtomicBoolean(false);
+
+ CollectionAdminRequest.AsyncCollectionAdminRequest moveReplica = new CollectionAdminRequest.MoveReplica(collectionName, replicas.get(0).getName(), survivor);
+ CollectionAdminRequest.AsyncCollectionAdminRequest mockRequest = new CollectionAdminRequest.AsyncCollectionAdminRequest(CollectionParams.CollectionAction.OVERSEERSTATUS) {
+ @Override
+ public void setAsyncId(String asyncId) {
+ super.setAsyncId(asyncId);
+ String parentPath = ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH + "/xyz/execute_plan";
+ try {
+ if (cluster.getDistribStateManager().hasData(parentPath)) {
+ java.util.List<String> children = cluster.getDistribStateManager().listData(parentPath);
+ if (!children.isEmpty()) {
+ String child = children.get(0);
+ VersionedData data = cluster.getDistribStateManager().getData(parentPath + "/" + child);
+ Map m = (Map) Utils.fromJSON(data.getData());
+ if (m.containsKey("requestid")) {
+ znodeCreated.set(m.get("requestid").equals(asyncId));
+ }
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ }
+ };
+ List<CollectionAdminRequest.AsyncCollectionAdminRequest> operations = Lists.asList(moveReplica, new CollectionAdminRequest.AsyncCollectionAdminRequest[]{mockRequest});
+ NodeLostTrigger.NodeLostEvent nodeLostEvent = new NodeLostTrigger.NodeLostEvent(TriggerEventType.NODELOST,
+ "mock_trigger_name", Collections.singletonList(TimeSource.CURRENT_TIME.getTime()),
+ Collections.singletonList(sourceNodeName));
+ ActionContext actionContext = new ActionContext(cluster, null,
+ new HashMap<>(Collections.singletonMap("operations", operations)));
+ action.process(nodeLostEvent, actionContext);
+
+// assertTrue("ExecutePlanAction should have stored the requestid in ZK before executing the request", znodeCreated.get());
+ List<NamedList<Object>> responses = (List<NamedList<Object>>) actionContext.getProperty("responses");
+ assertNotNull(responses);
+ assertEquals(2, responses.size());
+ NamedList<Object> response = responses.get(0);
+ assertNull(response.get("failure"));
+ assertNotNull(response.get("success"));
+ }
+
+ log.info("Collection ready after " + waitForState(collectionName, 300, TimeUnit.SECONDS, clusterShape(1, 2)) + "ms");
+ }
+
+ @Test
+ public void testIntegration() throws Exception {
+ SolrClient solrClient = cluster.simGetSolrClient();
+
+ String setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'node_lost_trigger'," +
+ "'event' : 'nodeLost'," +
+ "'waitFor' : '1s'," +
+ "'enabled' : true," +
+ "'actions' : [{'name':'compute_plan', 'class' : 'solr.ComputePlanAction'}," +
+ "{'name':'execute_plan','class':'solr.ExecutePlanAction'}]" +
+ "}}";
+ SolrRequest req = AutoScalingHandlerTest.createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+ NamedList<Object> response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ String collectionName = "testIntegration";
+ CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName,
+ "conf", 1, 2);
+ create.setMaxShardsPerNode(1);
+ create.process(solrClient);
+
+ waitForState("Timed out waiting for replicas of new collection to be active",
+ collectionName, clusterShape(1, 2));
+
+ String sourceNodeName = cluster.getSimClusterStateProvider().simGetRandomNode(random());
+ ClusterState clusterState = cluster.getClusterStateProvider().getClusterState();
+ DocCollection docCollection = clusterState.getCollection(collectionName);
+ List<Replica> replicas = docCollection.getReplicas(sourceNodeName);
+ assertNotNull(replicas);
+ assertFalse(replicas.isEmpty());
+
+ List<String> otherNodes = cluster.getClusterStateProvider().getLiveNodes().stream()
+ .filter(node -> !node.equals(sourceNodeName)).collect(Collectors.toList());
+ assertFalse(otherNodes.isEmpty());
+ String survivor = otherNodes.get(0);
+
+ cluster.simRemoveNode(sourceNodeName, false);
+
+ waitForState("Timed out waiting for replicas of collection to be 2 again",
+ collectionName, clusterShape(1, 2));
+
+ clusterState = cluster.getClusterStateProvider().getClusterState();
+ docCollection = clusterState.getCollection(collectionName);
+ List<Replica> replicasOnSurvivor = docCollection.getReplicas(survivor);
+ assertNotNull(replicasOnSurvivor);
+ assertEquals(2, replicasOnSurvivor.size());
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d6d2e3b2/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestGenericDistributedQueue.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestGenericDistributedQueue.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestGenericDistributedQueue.java
new file mode 100644
index 0000000..cba700b
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestGenericDistributedQueue.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud.autoscaling.sim;
+
+import org.apache.solr.client.solrj.cloud.DistributedQueue;
+import org.apache.solr.client.solrj.cloud.autoscaling.DistribStateManager;
+
+/**
+ *
+ */
+public class TestGenericDistributedQueue extends TestSimDistributedQueue {
+ DistribStateManager stateManager = new SimDistribStateManager();
+
+ @Override
+ protected DistributedQueue makeDistributedQueue(String dqZNode) throws Exception {
+ return new GenericDistributedQueue(stateManager, dqZNode);
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/d6d2e3b2/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestLargeCluster.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestLargeCluster.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestLargeCluster.java
new file mode 100644
index 0000000..034a039
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestLargeCluster.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.autoscaling.sim;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
+import org.apache.solr.client.solrj.cloud.autoscaling.SolrCloudManager;
+import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventProcessorStage;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.cloud.autoscaling.ActionContext;
+import org.apache.solr.cloud.autoscaling.ComputePlanAction;
+import org.apache.solr.cloud.autoscaling.ExecutePlanAction;
+import org.apache.solr.cloud.autoscaling.TriggerActionBase;
+import org.apache.solr.cloud.autoscaling.TriggerEvent;
+import org.apache.solr.cloud.autoscaling.TriggerListenerBase;
+import org.apache.solr.cloud.autoscaling.CapturedEvent;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.params.CollectionParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.TimeSource;
+import org.apache.solr.util.LogLevel;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.cloud.autoscaling.AutoScalingHandlerTest.createAutoScalingRequest;
+
+/**
+ *
+ */
+@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG")
+public class TestLargeCluster extends SimSolrCloudTestCase {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ public static final int SPEED = 50;
+
+ public static final int NUM_NODES = 100;
+
+ static Map<String, List<CapturedEvent>> listenerEvents = new ConcurrentHashMap<>();
+ static AtomicInteger triggerFiredCount = new AtomicInteger();
+ static CountDownLatch triggerFiredLatch;
+ static int waitForSeconds;
+
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ configureCluster(NUM_NODES, TimeSource.get("simTime:" + SPEED));
+ }
+
+ @Before
+ public void setupTest() throws Exception {
+
+ waitForSeconds = 1 + random().nextInt(3);
+ triggerFiredCount.set(0);
+ triggerFiredLatch = new CountDownLatch(1);
+ listenerEvents.clear();
+ while (cluster.getClusterStateProvider().getLiveNodes().size() < NUM_NODES) {
+ // perhaps a test stopped a node but didn't start it back
+ // lets start a node
+ cluster.simAddNode();
+ }
+ }
+
+ public static class TestTriggerListener extends TriggerListenerBase {
+ @Override
+ public void init(SolrCloudManager cloudManager, AutoScalingConfig.TriggerListenerConfig config) {
+ super.init(cloudManager, config);
+ }
+
+ @Override
+ public synchronized void onEvent(TriggerEvent event, TriggerEventProcessorStage stage, String actionName,
+ ActionContext context, Throwable error, String message) {
+ List<CapturedEvent> lst = listenerEvents.computeIfAbsent(config.name, s -> new ArrayList<>());
+ lst.add(new CapturedEvent(cluster.getTimeSource().getTime(), context, config, stage, actionName, event, message));
+ }
+ }
+
+ public static class TestTriggerAction extends TriggerActionBase {
+ @Override
+ public void process(TriggerEvent event, ActionContext context) throws Exception {
+ triggerFiredCount.incrementAndGet();
+ triggerFiredLatch.countDown();
+ }
+ }
+
+ @Test
+ public void testBasic() throws Exception {
+ SolrClient solrClient = cluster.simGetSolrClient();
+ String setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'node_lost_trigger'," +
+ "'event' : 'nodeLost'," +
+ "'waitFor' : '" + waitForSeconds + "s'," +
+ "'enabled' : true," +
+ "'actions' : [" +
+ "{'name':'compute','class':'" + ComputePlanAction.class.getName() + "'}," +
+ "{'name':'execute','class':'" + ExecutePlanAction.class.getName() + "'}," +
+ "{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}" +
+ "]" +
+ "}}";
+ SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+ NamedList<Object> response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ String setListenerCommand = "{" +
+ "'set-listener' : " +
+ "{" +
+ "'name' : 'foo'," +
+ "'trigger' : 'node_lost_trigger'," +
+ "'stage' : ['STARTED','ABORTED','SUCCEEDED', 'FAILED']," +
+ "'beforeAction' : ['compute', 'execute']," +
+ "'afterAction' : ['compute', 'execute']," +
+ "'class' : '" + TestTriggerListener.class.getName() + "'" +
+ "}" +
+ "}";
+ req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand);
+ response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ cluster.getTimeSource().sleep(5000);
+
+ // pick a few random nodes
+ List<String> nodes = new ArrayList<>();
+ int limit = 75;
+ for (String node : cluster.getClusterStateProvider().getLiveNodes()) {
+ nodes.add(node);
+ if (nodes.size() > limit) {
+ break;
+ }
+ }
+ Collections.shuffle(nodes, random());
+ // create collection on these nodes
+ String collectionName = "testBasic";
+ CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName,
+ "conf", 5, 5, 5, 5);
+ create.setMaxShardsPerNode(1);
+ create.setCreateNodeSet(String.join(",", nodes));
+ create.process(solrClient);
+
+ log.info("Ready after " + waitForState(collectionName, 30 * nodes.size(), TimeUnit.SECONDS, clusterShape(5, 15)) + "ms");
+
+ int KILL_NODES = 8;
+ // kill off a number of nodes
+ for (int i = 0; i < KILL_NODES; i++) {
+ cluster.simRemoveNode(nodes.get(i), false);
+ }
+ // should fully recover
+ log.info("Ready after " + waitForState(collectionName, 90 * KILL_NODES, TimeUnit.SECONDS, clusterShape(5, 15)) + "ms");
+
+ log.info("OP COUNTS: " + cluster.simGetOpCounts());
+ long moveReplicaOps = cluster.simGetOpCount(CollectionParams.CollectionAction.MOVEREPLICA.name());
+
+ // simulate a number of flaky nodes
+ int FLAKY_NODES = 10;
+ int flakyReplicas = 0;
+ for (int cnt = 0; cnt < 10; cnt++) {
+ for (int i = KILL_NODES; i < KILL_NODES + FLAKY_NODES; i++) {
+ flakyReplicas += cluster.getSimClusterStateProvider().simGetReplicaInfos(nodes.get(i))
+ .stream().filter(r -> r.getState().equals(Replica.State.ACTIVE)).count();
+ cluster.simRemoveNode(nodes.get(i), false);
+ }
+ cluster.getTimeSource().sleep(TimeUnit.SECONDS.toMillis(waitForSeconds) * 2);
+ for (int i = KILL_NODES; i < KILL_NODES + FLAKY_NODES; i++) {
+ final String nodeId = nodes.get(i);
+ cluster.submit(() -> cluster.getSimClusterStateProvider().simRestoreNode(nodeId));
+ }
+ }
+
+ log.info("Ready after " + waitForState(collectionName, 30 * nodes.size(), TimeUnit.SECONDS, clusterShape(5, 15)) + "ms");
+ log.info("OP COUNTS: " + cluster.simGetOpCounts());
+ long newMoveReplicaOps = cluster.simGetOpCount(CollectionParams.CollectionAction.MOVEREPLICA.name());
+ log.info("==== Flaky replicas: {}. Additional MOVEREPLICA count: {}", flakyReplicas, (newMoveReplicaOps - moveReplicaOps));
+ // flaky nodes lead to a number of MOVEREPLICA that is non-zero but lower than the number of flaky replicas
+ assertTrue("there should be new MOVERPLICA ops", newMoveReplicaOps - moveReplicaOps > 0);
+ assertTrue("there should be less than flakyReplicas=" + flakyReplicas + " MOVEREPLICA ops",
+ newMoveReplicaOps - moveReplicaOps < flakyReplicas);
+ }
+
+ @Test
+ @AwaitsFix(bugUrl = "https://issues.apache.org/jira/browse/SOLR-11714")
+ public void testSearchRate() throws Exception {
+ SolrClient solrClient = cluster.simGetSolrClient();
+ String setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'search_rate_trigger'," +
+ "'event' : 'searchRate'," +
+ "'waitFor' : '" + waitForSeconds + "s'," +
+ "'rate' : 1.0," +
+ "'enabled' : true," +
+ "'actions' : [" +
+ "{'name':'compute','class':'" + ComputePlanAction.class.getName() + "'}," +
+ "{'name':'execute','class':'" + ExecutePlanAction.class.getName() + "'}," +
+ "{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}" +
+ "]" +
+ "}}";
+ SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+ NamedList<Object> response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+ String setListenerCommand1 = "{" +
+ "'set-listener' : " +
+ "{" +
+ "'name' : 'srt'," +
+ "'trigger' : 'search_rate_trigger'," +
+ "'stage' : ['FAILED','SUCCEEDED']," +
+ "'class' : '" + TestTriggerListener.class.getName() + "'" +
+ "}" +
+ "}";
+ req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand1);
+ response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ String collectionName = "testSearchRate";
+ CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName,
+ "conf", 2, 10);
+ create.process(solrClient);
+
+ log.info("Ready after " + waitForState(collectionName, 300, TimeUnit.SECONDS, clusterShape(2, 10)) + " ms");
+
+ // collect the node names
+ Set<String> nodes = new HashSet<>();
+ cluster.getSimClusterStateProvider().getClusterState().getCollection(collectionName)
+ .getReplicas()
+ .forEach(r -> nodes.add(r.getNodeName()));
+
+ String metricName = "QUERY./select.requestTimes:1minRate";
+ // simulate search traffic
+ cluster.getSimClusterStateProvider().simSetShardValue(collectionName, "shard1", metricName, 40, true);
+
+ Thread.sleep(1000000000);
+// boolean await = triggerFiredLatch.await(20000 / SPEED, TimeUnit.MILLISECONDS);
+// assertTrue("The trigger did not fire at all", await);
+ // wait for listener to capture the SUCCEEDED stage
+ cluster.getTimeSource().sleep(2000);
+ assertEquals(listenerEvents.toString(), 1, listenerEvents.get("srt").size());
+ CapturedEvent ev = listenerEvents.get("srt").get(0);
+ }
+}