You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by sh...@apache.org on 2018/03/30 14:03:53 UTC
[2/6] lucene-solr:branch_7x: SOLR-12152: Split up
TriggerIntegrationTest into multiple tests to isolate and increase
reliability
SOLR-12152: Split up TriggerIntegrationTest into multiple tests to isolate and increase reliability
(cherry picked from commit ed9e5eb)
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/8533a46c
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/8533a46c
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/8533a46c
Branch: refs/heads/branch_7x
Commit: 8533a46cc4588e20e55b4e9e5f64699cd1bb0273
Parents: 12106f0
Author: Shalin Shekhar Mangar <sh...@apache.org>
Authored: Fri Mar 30 11:08:56 2018 +0530
Committer: Shalin Shekhar Mangar <sh...@apache.org>
Committed: Fri Mar 30 19:32:43 2018 +0530
----------------------------------------------------------------------
solr/CHANGES.txt | 2 +
.../MetricTriggerIntegrationTest.java | 242 +++++
.../NodeAddedTriggerIntegrationTest.java | 300 ++++++
.../NodeLostTriggerIntegrationTest.java | 322 ++++++
.../NodeMarkersRegistrationTest.java | 269 +++++
.../ScheduledTriggerIntegrationTest.java | 141 +++
.../SearchRateTriggerIntegrationTest.java | 217 ++++
.../TriggerCooldownIntegrationTest.java | 238 +++++
.../autoscaling/TriggerIntegrationTest.java | 1006 +-----------------
.../TriggerSetPropertiesIntegrationTest.java | 195 ++++
10 files changed, 1929 insertions(+), 1003 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8533a46c/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 17f30b1..8696913 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -108,6 +108,8 @@ Other Changes
* SOLR-12118: Solr Ref-Guide can now use some ivy version props directly as attributes in content (hossman)
+* SOLR-12152: Split up TriggerIntegrationTest into multiple tests to isolate and increase reliability. (shalin)
+
================== 7.3.0 ==================
Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release.
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8533a46c/solr/core/src/test/org/apache/solr/cloud/autoscaling/MetricTriggerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/MetricTriggerIntegrationTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/MetricTriggerIntegrationTest.java
new file mode 100644
index 0000000..7b6da5a
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/MetricTriggerIntegrationTest.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.autoscaling;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.cloud.SolrCloudManager;
+import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
+import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventProcessorStage;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.metrics.SolrCoreMetricManager;
+import org.apache.solr.util.LogLevel;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.cloud.autoscaling.AutoScalingHandlerTest.createAutoScalingRequest;
+import static org.apache.solr.cloud.autoscaling.TriggerIntegrationTest.WAIT_FOR_DELTA_NANOS;
+import static org.apache.solr.cloud.autoscaling.TriggerIntegrationTest.timeSource;
+
+/**
+ * Integration test for {@link MetricTrigger}
+ */
+@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG;org.apache.solr.client.solrj.cloud.autoscaling=DEBUG")
+public class MetricTriggerIntegrationTest extends SolrCloudTestCase {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ static Map<String, List<CapturedEvent>> listenerEvents = new HashMap<>();
+ static CountDownLatch listenerCreated = new CountDownLatch(1);
+ private static CountDownLatch triggerFiredLatch;
+ private static int waitForSeconds = 1;
+ private static Set<TriggerEvent> events = ConcurrentHashMap.newKeySet();
+
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ configureCluster(2)
+ .addConfig("conf", configset("cloud-minimal"))
+ .configure();
+ // disable .scheduled_maintenance
+ String suspendTriggerCommand = "{" +
+ "'suspend-trigger' : {'name' : '.scheduled_maintenance'}" +
+ "}";
+ SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, suspendTriggerCommand);
+ SolrClient solrClient = cluster.getSolrClient();
+ NamedList<Object> response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+ triggerFiredLatch = new CountDownLatch(1);
+ }
+
+ @Test
+ public void testMetricTrigger() throws Exception {
+ cluster.waitForAllNodes(5);
+
+ String collectionName = "testMetricTrigger";
+ CloudSolrClient solrClient = cluster.getSolrClient();
+ CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName,
+ "conf", 2, 2).setMaxShardsPerNode(2);
+ create.process(solrClient);
+ solrClient.setDefaultCollection(collectionName);
+
+ waitForState("Timed out waiting for collection:" + collectionName + " to become active", collectionName, clusterShape(2, 2));
+
+ DocCollection docCollection = solrClient.getZkStateReader().getClusterState().getCollection(collectionName);
+ String shardId = "shard1";
+ Replica replica = docCollection.getSlice(shardId).getReplicas().iterator().next();
+ String coreName = replica.getCoreName();
+ String replicaName = Utils.parseMetricsReplicaName(collectionName, coreName);
+ long waitForSeconds = 2 + random().nextInt(5);
+ String registry = SolrCoreMetricManager.createRegistryName(true, collectionName, shardId, replicaName, null);
+ String tag = "metrics:" + registry + ":INDEX.sizeInBytes";
+
+ String setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'metric_trigger'," +
+ "'event' : 'metric'," +
+ "'waitFor' : '" + waitForSeconds + "s'," +
+ "'enabled' : true," +
+ "'metric': '" + tag + "'" +
+ "'above' : 100.0," +
+ "'collection': '" + collectionName + "'" +
+ "'shard':'" + shardId + "'" +
+ "'actions' : [" +
+ "{'name':'compute','class':'" + ComputePlanAction.class.getName() + "'}," +
+ "{'name':'execute','class':'" + ExecutePlanAction.class.getName() + "'}," +
+ "{'name':'test','class':'" + MetricAction.class.getName() + "'}" +
+ "]" +
+ "}}";
+ SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+ NamedList<Object> response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ String setListenerCommand1 = "{" +
+ "'set-listener' : " +
+ "{" +
+ "'name' : 'srt'," +
+ "'trigger' : 'metric_trigger'," +
+ "'stage' : ['FAILED','SUCCEEDED']," +
+ "'afterAction': ['compute', 'execute', 'test']," +
+ "'class' : '" + TestTriggerListener.class.getName() + "'" +
+ "}" +
+ "}";
+ req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand1);
+ response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ // start more nodes so that we have at least 4
+ for (int i = cluster.getJettySolrRunners().size(); i < 4; i++) {
+ cluster.startJettySolrRunner();
+ }
+ cluster.waitForAllNodes(10);
+
+ List<SolrInputDocument> docs = new ArrayList<>(500);
+ for (int i = 0; i < 500; i++) {
+ docs.add(new SolrInputDocument("id", String.valueOf(i), "x_s", "x" + i));
+ }
+ solrClient.add(docs);
+ solrClient.commit();
+
+ boolean await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
+ assertTrue("The trigger did not fire at all", await);
+ // wait for listener to capture the SUCCEEDED stage
+ Thread.sleep(2000);
+ assertEquals(listenerEvents.toString(), 4, listenerEvents.get("srt").size());
+ CapturedEvent ev = listenerEvents.get("srt").get(0);
+ long now = timeSource.getTimeNs();
+ // verify waitFor
+ assertTrue(TimeUnit.SECONDS.convert(waitForSeconds, TimeUnit.NANOSECONDS) - WAIT_FOR_DELTA_NANOS <= now - ev.event.getEventTime());
+ assertEquals(collectionName, ev.event.getProperties().get("collection"));
+
+ // find a new replica and create its metric name
+ docCollection = solrClient.getZkStateReader().getClusterState().getCollection(collectionName);
+ replica = docCollection.getSlice(shardId).getReplicas().iterator().next();
+ coreName = replica.getCoreName();
+ replicaName = Utils.parseMetricsReplicaName(collectionName, coreName);
+ registry = SolrCoreMetricManager.createRegistryName(true, collectionName, shardId, replicaName, null);
+ tag = "metrics:" + registry + ":INDEX.sizeInBytes";
+
+ triggerFiredLatch = new CountDownLatch(1);
+ listenerEvents.clear();
+
+ setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'metric_trigger'," +
+ "'event' : 'metric'," +
+ "'waitFor' : '" + waitForSeconds + "s'," +
+ "'enabled' : true," +
+ "'metric': '" + tag + "'" +
+ "'above' : 100.0," +
+ "'collection': '" + collectionName + "'" +
+ "'shard':'" + shardId + "'" +
+ "'preferredOperation':'addreplica'" +
+ "'actions' : [" +
+ "{'name':'compute','class':'" + ComputePlanAction.class.getName() + "'}," +
+ "{'name':'execute','class':'" + ExecutePlanAction.class.getName() + "'}," +
+ "{'name':'test','class':'" + MetricAction.class.getName() + "'}" +
+ "]" +
+ "}}";
+ req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+ response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
+ assertTrue("The trigger did not fire at all", await);
+ // wait for listener to capture the SUCCEEDED stage
+ Thread.sleep(2000);
+ assertEquals(listenerEvents.toString(), 4, listenerEvents.get("srt").size());
+ ev = listenerEvents.get("srt").get(0);
+ now = timeSource.getTimeNs();
+ // verify waitFor
+ assertTrue(TimeUnit.SECONDS.convert(waitForSeconds, TimeUnit.NANOSECONDS) - WAIT_FOR_DELTA_NANOS <= now - ev.event.getEventTime());
+ assertEquals(collectionName, ev.event.getProperties().get("collection"));
+ docCollection = solrClient.getZkStateReader().getClusterState().getCollection(collectionName);
+ assertEquals(5, docCollection.getReplicas().size());
+ }
+
+ public static class MetricAction extends TriggerActionBase {
+
+ @Override
+ public void process(TriggerEvent event, ActionContext context) throws Exception {
+ try {
+ events.add(event);
+ long currentTimeNanos = timeSource.getTimeNs();
+ long eventTimeNanos = event.getEventTime();
+ long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
+ if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
+ fail(event.source + " was fired before the configured waitFor period");
+ }
+ triggerFiredLatch.countDown();
+ } catch (Throwable t) {
+ log.debug("--throwable", t);
+ throw t;
+ }
+ }
+ }
+
+ public static class TestTriggerListener extends TriggerListenerBase {
+ @Override
+ public void init(SolrCloudManager cloudManager, AutoScalingConfig.TriggerListenerConfig config) {
+ super.init(cloudManager, config);
+ listenerCreated.countDown();
+ }
+
+ @Override
+ public synchronized void onEvent(TriggerEvent event, TriggerEventProcessorStage stage, String actionName,
+ ActionContext context, Throwable error, String message) {
+ List<CapturedEvent> lst = listenerEvents.computeIfAbsent(config.name, s -> new ArrayList<>());
+ lst.add(new CapturedEvent(timeSource.getTimeNs(), context, config, stage, actionName, event, message));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8533a46c/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeAddedTriggerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeAddedTriggerIntegrationTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeAddedTriggerIntegrationTest.java
new file mode 100644
index 0000000..ecf2437
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeAddedTriggerIntegrationTest.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.autoscaling;
+
+import java.lang.invoke.MethodHandles;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.cloud.SolrCloudManager;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.cloud.Overseer;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.util.LogLevel;
+import org.apache.solr.util.TimeOut;
+import org.apache.zookeeper.data.Stat;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.cloud.autoscaling.AutoScalingHandlerTest.createAutoScalingRequest;
+import static org.apache.solr.cloud.autoscaling.ScheduledTriggers.DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS;
+import static org.apache.solr.cloud.autoscaling.TriggerIntegrationTest.WAIT_FOR_DELTA_NANOS;
+import static org.apache.solr.common.cloud.ZkStateReader.SOLR_AUTOSCALING_CONF_PATH;
+
+@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG;org.apache.solr.client.solrj.cloud.autoscaling=DEBUG")
+public class NodeAddedTriggerIntegrationTest extends SolrCloudTestCase {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private static CountDownLatch actionConstructorCalled;
+ private static CountDownLatch actionInitCalled;
+ private static CountDownLatch triggerFiredLatch;
+ private static int waitForSeconds = 1;
+ private static AtomicBoolean triggerFired;
+ private static Set<TriggerEvent> events = ConcurrentHashMap.newKeySet();
+ private static SolrCloudManager cloudManager;
+
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ configureCluster(2)
+ .addConfig("conf", configset("cloud-minimal"))
+ .configure();
+ // disable .scheduled_maintenance
+ String suspendTriggerCommand = "{" +
+ "'suspend-trigger' : {'name' : '.scheduled_maintenance'}" +
+ "}";
+ SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, suspendTriggerCommand);
+ SolrClient solrClient = cluster.getSolrClient();
+ NamedList<Object> response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+ }
+
+ private static CountDownLatch getTriggerFiredLatch() {
+ return triggerFiredLatch;
+ }
+
+ @Before
+ public void setupTest() throws Exception {
+ // ensure that exactly 2 jetty nodes are running
+ int numJetties = cluster.getJettySolrRunners().size();
+ log.info("Found {} jetty instances running", numJetties);
+ for (int i = 2; i < numJetties; i++) {
+ int r = random().nextInt(cluster.getJettySolrRunners().size());
+ log.info("Shutdown extra jetty instance at port {}", cluster.getJettySolrRunner(r).getLocalPort());
+ cluster.stopJettySolrRunner(r);
+ }
+ for (int i = cluster.getJettySolrRunners().size(); i < 2; i++) {
+ // start jetty instances
+ cluster.startJettySolrRunner();
+ }
+ cluster.waitForAllNodes(5);
+
+ NamedList<Object> overSeerStatus = cluster.getSolrClient().request(CollectionAdminRequest.getOverseerStatus());
+ String overseerLeader = (String) overSeerStatus.get("leader");
+ int overseerLeaderIndex = 0;
+ for (int i = 0; i < cluster.getJettySolrRunners().size(); i++) {
+ JettySolrRunner jetty = cluster.getJettySolrRunner(i);
+ if (jetty.getNodeName().equals(overseerLeader)) {
+ overseerLeaderIndex = i;
+ break;
+ }
+ }
+ Overseer overseer = cluster.getJettySolrRunner(overseerLeaderIndex).getCoreContainer().getZkController().getOverseer();
+ ScheduledTriggers scheduledTriggers = ((OverseerTriggerThread) overseer.getTriggerThread().getThread()).getScheduledTriggers();
+ // aggressively remove all active scheduled triggers
+ scheduledTriggers.removeAll();
+
+ // clear any persisted auto scaling configuration
+ Stat stat = zkClient().setData(SOLR_AUTOSCALING_CONF_PATH, Utils.toJSON(new ZkNodeProps()), true);
+ log.info(SOLR_AUTOSCALING_CONF_PATH + " reset, new znode version {}", stat.getVersion());
+
+ cluster.deleteAllCollections();
+ cluster.getSolrClient().setDefaultCollection(null);
+
+ // restart Overseer. Even though we reset the autoscaling config some already running
+ // trigger threads may still continue to execute and produce spurious events
+ cluster.stopJettySolrRunner(overseerLeaderIndex);
+ Thread.sleep(5000);
+
+ waitForSeconds = 1 + random().nextInt(3);
+ actionConstructorCalled = new CountDownLatch(1);
+ actionInitCalled = new CountDownLatch(1);
+ triggerFiredLatch = new CountDownLatch(1);
+ triggerFired = new AtomicBoolean(false);
+ events.clear();
+
+ while (cluster.getJettySolrRunners().size() < 2) {
+ // perhaps a test stopped a node but didn't start it back
+ // lets start a node
+ cluster.startJettySolrRunner();
+ }
+
+ cloudManager = cluster.getJettySolrRunner(0).getCoreContainer().getZkController().getSolrCloudManager();
+ // clear any events or markers
+ // todo: consider the impact of such cleanup on regular cluster restarts
+ deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_EVENTS_PATH);
+ deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH);
+ deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH);
+ deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH);
+ }
+
+ private void deleteChildrenRecursively(String path) throws Exception {
+ cloudManager.getDistribStateManager().removeRecursively(path, true, false);
+ }
+
+ @Test
+ public void testNodeAddedTriggerRestoreState() throws Exception {
+ // for this test we want to update the trigger so we must assert that the actions were created twice
+ actionInitCalled = new CountDownLatch(2);
+
+ CloudSolrClient solrClient = cluster.getSolrClient();
+ waitForSeconds = 5;
+ String setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'node_added_restore_trigger'," +
+ "'event' : 'nodeAdded'," +
+ "'waitFor' : '5s'," + // should be enough for us to update the trigger
+ "'enabled' : true," +
+ "'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
+ "}}";
+ SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+ NamedList<Object> response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ TimeOut timeOut = new TimeOut(2, TimeUnit.SECONDS, cloudManager.getTimeSource());
+ while (actionInitCalled.getCount() == 0 && !timeOut.hasTimedOut()) {
+ Thread.sleep(200);
+ }
+ assertTrue("The action specified in node_added_restore_trigger was not instantiated even after 2 seconds", actionInitCalled.getCount() > 0);
+
+ // start a new node
+ JettySolrRunner newNode = cluster.startJettySolrRunner();
+
+ // ensure that the old trigger sees the new node, todo find a better way to do this
+ Thread.sleep(500 + TimeUnit.SECONDS.toMillis(DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS));
+
+ waitForSeconds = 0;
+ setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'node_added_restore_trigger'," +
+ "'event' : 'nodeAdded'," +
+ "'waitFor' : '0s'," + // update a property so that it replaces the old trigger, also we want it to fire immediately
+ "'enabled' : true," +
+ "'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
+ "}}";
+ req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+ response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ // wait until the second instance of action is created
+ if (!actionInitCalled.await(3, TimeUnit.SECONDS)) {
+ fail("Two TriggerAction instances should have been created by now");
+ }
+
+ boolean await = triggerFiredLatch.await(5, TimeUnit.SECONDS);
+ assertTrue("The trigger did not fire at all", await);
+ assertTrue(triggerFired.get());
+ NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = (NodeAddedTrigger.NodeAddedEvent) events.iterator().next();
+ assertNotNull(nodeAddedEvent);
+ List<String> nodeNames = (List<String>) nodeAddedEvent.getProperty(TriggerEvent.NODE_NAMES);
+ assertTrue(nodeNames.contains(newNode.getNodeName()));
+ }
+
+ @Test
+ public void testNodeAddedTrigger() throws Exception {
+ CloudSolrClient solrClient = cluster.getSolrClient();
+ String setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'node_added_trigger'," +
+ "'event' : 'nodeAdded'," +
+ "'waitFor' : '" + waitForSeconds + "s'," +
+ "'enabled' : true," +
+ "'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
+ "}}";
+ SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+ NamedList<Object> response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ if (!actionInitCalled.await(3, TimeUnit.SECONDS)) {
+ fail("The TriggerAction should have been created by now");
+ }
+
+ JettySolrRunner newNode = cluster.startJettySolrRunner();
+ boolean await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
+ assertTrue("The trigger did not fire at all", await);
+ assertTrue(triggerFired.get());
+ NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = (NodeAddedTrigger.NodeAddedEvent) events.iterator().next();
+ assertNotNull(nodeAddedEvent);
+ List<String> nodeNames = (List<String>) nodeAddedEvent.getProperty(TriggerEvent.NODE_NAMES);
+ assertTrue(nodeNames.contains(newNode.getNodeName()));
+
+ // reset
+ actionConstructorCalled = new CountDownLatch(1);
+ actionInitCalled = new CountDownLatch(1);
+
+ // update the trigger with exactly the same data
+ setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'node_added_trigger'," +
+ "'event' : 'nodeAdded'," +
+ "'waitFor' : '" + waitForSeconds + "s'," +
+ "'enabled' : true," +
+ "'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
+ "}}";
+ req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+ response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ // this should be a no-op so the action should have been created but init should not be called
+ if (!actionConstructorCalled.await(3, TimeUnit.SECONDS)) {
+ fail("The TriggerAction should have been created by now");
+ }
+
+ assertFalse(actionInitCalled.await(2, TimeUnit.SECONDS));
+ }
+
+ public static class TestTriggerAction extends TriggerActionBase {
+
+ public TestTriggerAction() {
+ actionConstructorCalled.countDown();
+ }
+
+ @Override
+ public void process(TriggerEvent event, ActionContext actionContext) {
+ try {
+ if (triggerFired.compareAndSet(false, true)) {
+ events.add(event);
+ long currentTimeNanos = TriggerIntegrationTest.timeSource.getTimeNs();
+ long eventTimeNanos = event.getEventTime();
+ long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
+ if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
+ fail(event.source + " was fired before the configured waitFor period");
+ }
+ getTriggerFiredLatch().countDown();
+ } else {
+ fail(event.source + " was fired more than once!");
+ }
+ } catch (Throwable t) {
+ log.debug("--throwable", t);
+ throw t;
+ }
+ }
+
+ @Override
+ public void init(Map<String, String> args) {
+ log.info("TestTriggerAction init");
+ actionInitCalled.countDown();
+ super.init(args);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8533a46c/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeLostTriggerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeLostTriggerIntegrationTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeLostTriggerIntegrationTest.java
new file mode 100644
index 0000000..6b1af65
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeLostTriggerIntegrationTest.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.autoscaling;
+
+import java.lang.invoke.MethodHandles;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.cloud.SolrCloudManager;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.cloud.Overseer;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.util.LogLevel;
+import org.apache.solr.util.TimeOut;
+import org.apache.zookeeper.data.Stat;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.cloud.autoscaling.AutoScalingHandlerTest.createAutoScalingRequest;
+import static org.apache.solr.cloud.autoscaling.ScheduledTriggers.DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS;
+import static org.apache.solr.cloud.autoscaling.TriggerIntegrationTest.WAIT_FOR_DELTA_NANOS;
+import static org.apache.solr.common.cloud.ZkStateReader.SOLR_AUTOSCALING_CONF_PATH;
+
+@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG;org.apache.solr.client.solrj.cloud.autoscaling=DEBUG")
+public class NodeLostTriggerIntegrationTest extends SolrCloudTestCase {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private static CountDownLatch actionConstructorCalled;
+ private static CountDownLatch actionInitCalled;
+ private static CountDownLatch triggerFiredLatch;
+ private static int waitForSeconds = 1;
+ private static AtomicBoolean triggerFired;
+ private static Set<TriggerEvent> events = ConcurrentHashMap.newKeySet();
+ private static SolrCloudManager cloudManager;
+
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ configureCluster(2)
+ .addConfig("conf", configset("cloud-minimal"))
+ .configure();
+ // disable .scheduled_maintenance
+ String suspendTriggerCommand = "{" +
+ "'suspend-trigger' : {'name' : '.scheduled_maintenance'}" +
+ "}";
+ SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, suspendTriggerCommand);
+ SolrClient solrClient = cluster.getSolrClient();
+ NamedList<Object> response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+ }
+
+ private static CountDownLatch getTriggerFiredLatch() {
+ return triggerFiredLatch;
+ }
+
+ @Before
+ public void setupTest() throws Exception {
+ // ensure that exactly 2 jetty nodes are running
+ int numJetties = cluster.getJettySolrRunners().size();
+ log.info("Found {} jetty instances running", numJetties);
+ for (int i = 2; i < numJetties; i++) {
+ int r = random().nextInt(cluster.getJettySolrRunners().size());
+ log.info("Shutdown extra jetty instance at port {}", cluster.getJettySolrRunner(r).getLocalPort());
+ cluster.stopJettySolrRunner(r);
+ }
+ for (int i = cluster.getJettySolrRunners().size(); i < 2; i++) {
+ // start jetty instances
+ cluster.startJettySolrRunner();
+ }
+ cluster.waitForAllNodes(5);
+
+ NamedList<Object> overSeerStatus = cluster.getSolrClient().request(CollectionAdminRequest.getOverseerStatus());
+ String overseerLeader = (String) overSeerStatus.get("leader");
+ int overseerLeaderIndex = 0;
+ for (int i = 0; i < cluster.getJettySolrRunners().size(); i++) {
+ JettySolrRunner jetty = cluster.getJettySolrRunner(i);
+ if (jetty.getNodeName().equals(overseerLeader)) {
+ overseerLeaderIndex = i;
+ break;
+ }
+ }
+ Overseer overseer = cluster.getJettySolrRunner(overseerLeaderIndex).getCoreContainer().getZkController().getOverseer();
+ ScheduledTriggers scheduledTriggers = ((OverseerTriggerThread) overseer.getTriggerThread().getThread()).getScheduledTriggers();
+ // aggressively remove all active scheduled triggers
+ scheduledTriggers.removeAll();
+
+ // clear any persisted auto scaling configuration
+ Stat stat = zkClient().setData(SOLR_AUTOSCALING_CONF_PATH, Utils.toJSON(new ZkNodeProps()), true);
+ log.info(SOLR_AUTOSCALING_CONF_PATH + " reset, new znode version {}", stat.getVersion());
+
+ cluster.deleteAllCollections();
+ cluster.getSolrClient().setDefaultCollection(null);
+
+ // restart Overseer. Even though we reset the autoscaling config some already running
+ // trigger threads may still continue to execute and produce spurious events
+ cluster.stopJettySolrRunner(overseerLeaderIndex);
+ Thread.sleep(5000);
+
+ waitForSeconds = 1 + random().nextInt(3);
+ actionConstructorCalled = new CountDownLatch(1);
+ actionInitCalled = new CountDownLatch(1);
+ triggerFiredLatch = new CountDownLatch(1);
+ triggerFired = new AtomicBoolean(false);
+ events.clear();
+
+ while (cluster.getJettySolrRunners().size() < 2) {
+ // perhaps a test stopped a node but didn't start it back
+ // lets start a node
+ cluster.startJettySolrRunner();
+ }
+
+ cloudManager = cluster.getJettySolrRunner(0).getCoreContainer().getZkController().getSolrCloudManager();
+ // clear any events or markers
+ // todo: consider the impact of such cleanup on regular cluster restarts
+ deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_EVENTS_PATH);
+ deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH);
+ deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH);
+ deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH);
+ }
+
+ private void deleteChildrenRecursively(String path) throws Exception {
+ cloudManager.getDistribStateManager().removeRecursively(path, true, false);
+ }
+
+ @Test
+ public void testNodeLostTriggerRestoreState() throws Exception {
+ // for this test we want to update the trigger so we must assert that the actions were created twice
+ actionInitCalled = new CountDownLatch(2);
+
+ // start a new node
+ JettySolrRunner newNode = cluster.startJettySolrRunner();
+ String nodeName = newNode.getNodeName();
+
+ CloudSolrClient solrClient = cluster.getSolrClient();
+ waitForSeconds = 5;
+ String setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'node_lost_restore_trigger'," +
+ "'event' : 'nodeLost'," +
+ "'waitFor' : '5s'," + // should be enough for us to update the trigger
+ "'enabled' : true," +
+ "'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
+ "}}";
+ SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+ NamedList<Object> response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ TimeOut timeOut = new TimeOut(2, TimeUnit.SECONDS, cloudManager.getTimeSource());
+ while (actionInitCalled.getCount() == 0 && !timeOut.hasTimedOut()) {
+ Thread.sleep(200);
+ }
+ assertTrue("The action specified in node_lost_restore_trigger was not instantiated even after 2 seconds", actionInitCalled.getCount() > 0);
+
+ List<JettySolrRunner> jettySolrRunners = cluster.getJettySolrRunners();
+ int index = -1;
+ for (int i = 0; i < jettySolrRunners.size(); i++) {
+ JettySolrRunner runner = jettySolrRunners.get(i);
+ if (runner == newNode) index = i;
+ }
+ assertFalse(index == -1);
+ cluster.stopJettySolrRunner(index);
+
+ // ensure that the old trigger sees the stopped node, todo find a better way to do this
+ Thread.sleep(500 + TimeUnit.SECONDS.toMillis(DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS));
+
+ waitForSeconds = 0;
+ setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'node_lost_restore_trigger'," +
+ "'event' : 'nodeLost'," +
+ "'waitFor' : '0s'," + // update a property so that it replaces the old trigger, also we want it to fire immediately
+ "'enabled' : true," +
+ "'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
+ "}}";
+ req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+ response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ // wait until the second instance of action is created
+ if (!actionInitCalled.await(3, TimeUnit.SECONDS)) {
+ fail("Two TriggerAction instances should have been created by now");
+ }
+
+ boolean await = triggerFiredLatch.await(5, TimeUnit.SECONDS);
+ assertTrue("The trigger did not fire at all", await);
+ assertTrue(triggerFired.get());
+ NodeLostTrigger.NodeLostEvent nodeLostEvent = (NodeLostTrigger.NodeLostEvent) events.iterator().next();
+ assertNotNull(nodeLostEvent);
+ List<String> nodeNames = (List<String>) nodeLostEvent.getProperty(TriggerEvent.NODE_NAMES);
+ assertTrue(nodeNames.contains(nodeName));
+ }
+
+ @Test
+ public void testNodeLostTrigger() throws Exception {
+ CloudSolrClient solrClient = cluster.getSolrClient();
+ String setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'node_lost_trigger'," +
+ "'event' : 'nodeLost'," +
+ "'waitFor' : '" + waitForSeconds + "s'," +
+ "'enabled' : true," +
+ "'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
+ "}}";
+ NamedList<Object> overSeerStatus = cluster.getSolrClient().request(CollectionAdminRequest.getOverseerStatus());
+ String overseerLeader = (String) overSeerStatus.get("leader");
+ int nonOverseerLeaderIndex = 0;
+ for (int i = 0; i < cluster.getJettySolrRunners().size(); i++) {
+ JettySolrRunner jetty = cluster.getJettySolrRunner(i);
+ if (!jetty.getNodeName().equals(overseerLeader)) {
+ nonOverseerLeaderIndex = i;
+ }
+ }
+ SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+ NamedList<Object> response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ if (!actionInitCalled.await(3, TimeUnit.SECONDS)) {
+ fail("The TriggerAction should have been created by now");
+ }
+
+ triggerFired.set(false);
+ triggerFiredLatch = new CountDownLatch(1);
+ String lostNodeName = cluster.getJettySolrRunner(nonOverseerLeaderIndex).getNodeName();
+ cluster.stopJettySolrRunner(nonOverseerLeaderIndex);
+ boolean await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
+ assertTrue("The trigger did not fire at all", await);
+ assertTrue(triggerFired.get());
+ NodeLostTrigger.NodeLostEvent nodeLostEvent = (NodeLostTrigger.NodeLostEvent) events.iterator().next();
+ assertNotNull(nodeLostEvent);
+ List<String> nodeNames = (List<String>) nodeLostEvent.getProperty(TriggerEvent.NODE_NAMES);
+ assertTrue(nodeNames.contains(lostNodeName));
+
+ // reset
+ actionConstructorCalled = new CountDownLatch(1);
+ actionInitCalled = new CountDownLatch(1);
+
+ // update the trigger with exactly the same data
+ setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'node_lost_trigger'," +
+ "'event' : 'nodeLost'," +
+ "'waitFor' : '" + waitForSeconds + "s'," +
+ "'enabled' : true," +
+ "'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
+ "}}";
+ req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+ response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ // this should be a no-op so the action should have been created but init should not be called
+ if (!actionConstructorCalled.await(3, TimeUnit.SECONDS)) {
+ fail("The TriggerAction should have been created by now");
+ }
+
+ assertFalse(actionInitCalled.await(2, TimeUnit.SECONDS));
+ }
+
+ public static class TestTriggerAction extends TriggerActionBase {
+
+ public TestTriggerAction() {
+ actionConstructorCalled.countDown();
+ }
+
+ @Override
+ public void process(TriggerEvent event, ActionContext actionContext) {
+ try {
+ if (triggerFired.compareAndSet(false, true)) {
+ events.add(event);
+ long currentTimeNanos = TriggerIntegrationTest.timeSource.getTimeNs();
+ long eventTimeNanos = event.getEventTime();
+ long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
+ if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
+ fail(event.source + " was fired before the configured waitFor period");
+ }
+ getTriggerFiredLatch().countDown();
+ } else {
+ fail(event.source + " was fired more than once!");
+ }
+ } catch (Throwable t) {
+ log.debug("--throwable", t);
+ throw t;
+ }
+ }
+
+ @Override
+ public void init(Map<String, String> args) {
+ log.info("TestTriggerAction init");
+ actionInitCalled.countDown();
+ super.init(args);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8533a46c/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeMarkersRegistrationTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeMarkersRegistrationTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeMarkersRegistrationTest.java
new file mode 100644
index 0000000..38c2165
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeMarkersRegistrationTest.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.autoscaling;
+
+import java.lang.invoke.MethodHandles;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.lucene.util.LuceneTestCase;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.cloud.LiveNodesListener;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.util.LogLevel;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.cloud.autoscaling.AutoScalingHandlerTest.createAutoScalingRequest;
+
+@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG;org.apache.solr.client.solrj.cloud.autoscaling=DEBUG")
+@LuceneTestCase.BadApple(bugUrl = "https://issues.apache.org/jira/browse/SOLR-12028")
+public class NodeMarkersRegistrationTest extends SolrCloudTestCase {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private static CountDownLatch actionInitCalled;
+ private static CountDownLatch triggerFiredLatch;
+ private static CountDownLatch actionConstructorCalled;
+ private static Set<TriggerEvent> events = ConcurrentHashMap.newKeySet();
+ private static ZkStateReader zkStateReader;
+ private static ReentrantLock lock = new ReentrantLock();
+
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ configureCluster(2)
+ .addConfig("conf", configset("cloud-minimal"))
+ .configure();
+ zkStateReader = cluster.getSolrClient().getZkStateReader();
+ // disable .scheduled_maintenance
+ String suspendTriggerCommand = "{" +
+ "'suspend-trigger' : {'name' : '.scheduled_maintenance'}" +
+ "}";
+ SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, suspendTriggerCommand);
+ SolrClient solrClient = cluster.getSolrClient();
+ NamedList<Object> response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+ }
+
+ private static CountDownLatch getTriggerFiredLatch() {
+ return triggerFiredLatch;
+ }
+
+ @Test
+ public void testNodeMarkersRegistration() throws Exception {
+ // for this test we want to create two triggers so we must assert that the actions were created twice
+ actionInitCalled = new CountDownLatch(2);
+ // similarly we want both triggers to fire
+ triggerFiredLatch = new CountDownLatch(2);
+ actionConstructorCalled = new CountDownLatch(1);
+ TestLiveNodesListener listener = registerLiveNodesListener();
+
+ NamedList<Object> overSeerStatus = cluster.getSolrClient().request(CollectionAdminRequest.getOverseerStatus());
+ String overseerLeader = (String) overSeerStatus.get("leader");
+ int overseerLeaderIndex = 0;
+ for (int i = 0; i < cluster.getJettySolrRunners().size(); i++) {
+ JettySolrRunner jetty = cluster.getJettySolrRunner(i);
+ if (jetty.getNodeName().equals(overseerLeader)) {
+ overseerLeaderIndex = i;
+ break;
+ }
+ }
+ // add a node
+ JettySolrRunner node = cluster.startJettySolrRunner();
+ if (!listener.onChangeLatch.await(10, TimeUnit.SECONDS)) {
+ fail("onChange listener didn't execute on cluster change");
+ }
+ assertEquals(1, listener.addedNodes.size());
+ assertEquals(node.getNodeName(), listener.addedNodes.iterator().next());
+ // verify that a znode doesn't exist (no trigger)
+ String pathAdded = ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + node.getNodeName();
+ assertFalse("Path " + pathAdded + " was created but there are no nodeAdded triggers", zkClient().exists(pathAdded, true));
+ listener.reset();
+ // stop overseer
+ log.info("====== KILL OVERSEER 1");
+ cluster.stopJettySolrRunner(overseerLeaderIndex);
+ if (!listener.onChangeLatch.await(10, TimeUnit.SECONDS)) {
+ fail("onChange listener didn't execute on cluster change");
+ }
+ assertEquals(1, listener.lostNodes.size());
+ assertEquals(overseerLeader, listener.lostNodes.iterator().next());
+ assertEquals(0, listener.addedNodes.size());
+ // wait until the new overseer is up
+ Thread.sleep(5000);
+ // verify that a znode does NOT exist - there's no nodeLost trigger,
+ // so the new overseer cleaned up existing nodeLost markers
+ String pathLost = ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH + "/" + overseerLeader;
+ assertFalse("Path " + pathLost + " exists", zkClient().exists(pathLost, true));
+
+ listener.reset();
+
+ // set up triggers
+ CloudSolrClient solrClient = cluster.getSolrClient();
+
+ log.info("====== ADD TRIGGERS");
+ String setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'node_added_triggerMR'," +
+ "'event' : 'nodeAdded'," +
+ "'waitFor' : '1s'," +
+ "'enabled' : true," +
+ "'actions' : [{'name':'test','class':'" + TestEventMarkerAction.class.getName() + "'}]" +
+ "}}";
+ SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+ NamedList<Object> response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'node_lost_triggerMR'," +
+ "'event' : 'nodeLost'," +
+ "'waitFor' : '1s'," +
+ "'enabled' : true," +
+ "'actions' : [{'name':'test','class':'" + TestEventMarkerAction.class.getName() + "'}]" +
+ "}}";
+ req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+ response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ overSeerStatus = cluster.getSolrClient().request(CollectionAdminRequest.getOverseerStatus());
+ overseerLeader = (String) overSeerStatus.get("leader");
+ overseerLeaderIndex = 0;
+ for (int i = 0; i < cluster.getJettySolrRunners().size(); i++) {
+ JettySolrRunner jetty = cluster.getJettySolrRunner(i);
+ if (jetty.getNodeName().equals(overseerLeader)) {
+ overseerLeaderIndex = i;
+ break;
+ }
+ }
+
+ // create another node
+ log.info("====== ADD NODE 1");
+ JettySolrRunner node1 = cluster.startJettySolrRunner();
+ if (!listener.onChangeLatch.await(10, TimeUnit.SECONDS)) {
+ fail("onChange listener didn't execute on cluster change");
+ }
+ assertEquals(1, listener.addedNodes.size());
+ assertEquals(node1.getNodeName(), listener.addedNodes.iterator().next());
+ // verify that a znode exists
+ pathAdded = ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + node1.getNodeName();
+ assertTrue("Path " + pathAdded + " wasn't created", zkClient().exists(pathAdded, true));
+
+ Thread.sleep(5000);
+ // nodeAdded marker should be consumed now by nodeAdded trigger
+ assertFalse("Path " + pathAdded + " should have been deleted", zkClient().exists(pathAdded, true));
+
+ listener.reset();
+ events.clear();
+ triggerFiredLatch = new CountDownLatch(1);
+ // kill overseer again
+ log.info("====== KILL OVERSEER 2");
+ cluster.stopJettySolrRunner(overseerLeaderIndex);
+ if (!listener.onChangeLatch.await(10, TimeUnit.SECONDS)) {
+ fail("onChange listener didn't execute on cluster change");
+ }
+
+
+ if (!triggerFiredLatch.await(20, TimeUnit.SECONDS)) {
+ fail("Trigger should have fired by now");
+ }
+ assertEquals(1, events.size());
+ TriggerEvent ev = events.iterator().next();
+ List<String> nodeNames = (List<String>) ev.getProperty(TriggerEvent.NODE_NAMES);
+ assertTrue(nodeNames.contains(overseerLeader));
+ assertEquals(TriggerEventType.NODELOST, ev.getEventType());
+ }
+
+ private TestLiveNodesListener registerLiveNodesListener() {
+ TestLiveNodesListener listener = new TestLiveNodesListener();
+ zkStateReader.registerLiveNodesListener(listener);
+ return listener;
+ }
+
+ private static class TestLiveNodesListener implements LiveNodesListener {
+ Set<String> lostNodes = new HashSet<>();
+ Set<String> addedNodes = new HashSet<>();
+ CountDownLatch onChangeLatch = new CountDownLatch(1);
+
+ public void reset() {
+ lostNodes.clear();
+ addedNodes.clear();
+ onChangeLatch = new CountDownLatch(1);
+ }
+
+ @Override
+ public void onChange(SortedSet<String> oldLiveNodes, SortedSet<String> newLiveNodes) {
+ onChangeLatch.countDown();
+ Set<String> old = new HashSet<>(oldLiveNodes);
+ old.removeAll(newLiveNodes);
+ if (!old.isEmpty()) {
+ lostNodes.addAll(old);
+ }
+ newLiveNodes.removeAll(oldLiveNodes);
+ if (!newLiveNodes.isEmpty()) {
+ addedNodes.addAll(newLiveNodes);
+ }
+ }
+ }
+
+ public static class TestEventMarkerAction extends TriggerActionBase {
+
+ public TestEventMarkerAction() {
+ actionConstructorCalled.countDown();
+ }
+
+ @Override
+ public void process(TriggerEvent event, ActionContext actionContext) {
+ boolean locked = lock.tryLock();
+ if (!locked) {
+ log.info("We should never have a tryLock fail because actions are never supposed to be executed concurrently");
+ return;
+ }
+ try {
+ events.add(event);
+ getTriggerFiredLatch().countDown();
+ } catch (Throwable t) {
+ log.debug("--throwable", t);
+ throw t;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public void init(Map<String, String> args) {
+ log.info("TestEventMarkerAction init");
+ actionInitCalled.countDown();
+ super.init(args);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8533a46c/solr/core/src/test/org/apache/solr/cloud/autoscaling/ScheduledTriggerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/ScheduledTriggerIntegrationTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/ScheduledTriggerIntegrationTest.java
new file mode 100644
index 0000000..24e7420
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/ScheduledTriggerIntegrationTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.autoscaling;
+
+import java.lang.invoke.MethodHandles;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.util.LogLevel;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.cloud.autoscaling.AutoScalingHandlerTest.createAutoScalingRequest;
+
+/**
+ * Integration test for {@link ScheduledTrigger}
+ */
+@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG;org.apache.solr.client.solrj.cloud.autoscaling=DEBUG")
+@BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 26-Mar-2018
+public class ScheduledTriggerIntegrationTest extends SolrCloudTestCase {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private static CountDownLatch triggerFiredLatch;
+ private static Set<TriggerEvent> events = ConcurrentHashMap.newKeySet();
+ private static AtomicReference<Map<String, Object>> actionContextPropertiesRef = new AtomicReference<>();
+
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ configureCluster(2)
+ .addConfig("conf", configset("cloud-minimal"))
+ .configure();
+ // disable .scheduled_maintenance
+ String suspendTriggerCommand = "{" +
+ "'suspend-trigger' : {'name' : '.scheduled_maintenance'}" +
+ "}";
+ SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, suspendTriggerCommand);
+ SolrClient solrClient = cluster.getSolrClient();
+ NamedList<Object> response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+ triggerFiredLatch = new CountDownLatch(1);
+ }
+
+ @Test
+ public void testScheduledTrigger() throws Exception {
+ CloudSolrClient solrClient = cluster.getSolrClient();
+
+ // this collection will place 2 cores on 1st node and 1 core on 2nd node
+ String collectionName = "testScheduledTrigger";
+ CollectionAdminRequest.createCollection(collectionName, 1, 3)
+ .setMaxShardsPerNode(5).process(solrClient);
+ waitForState("", collectionName, clusterShape(1, 3));
+
+ // create a policy which allows only 1 core per node thereby creating a violation for the above collection
+ String setClusterPolicy = "{\n" +
+ " \"set-cluster-policy\" : [\n" +
+ " {\"cores\" : \"<2\", \"node\" : \"#EACH\"}\n" +
+ " ]\n" +
+ "}";
+ SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setClusterPolicy);
+ NamedList<Object> response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ // start a new node which can be used to balance the cluster as per policy
+ JettySolrRunner newNode = cluster.startJettySolrRunner();
+ cluster.waitForAllNodes(10);
+
+ String setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'sched_trigger_integration1'," +
+ "'event' : 'scheduled'," +
+ "'startTime' : '" + new Date().toInstant().toString() + "'" +
+ "'every' : '+3SECONDS'" +
+ "'actions' : [" +
+ "{'name' : 'compute','class':'" + ComputePlanAction.class.getName() + "'}," +
+ "{'name' : 'execute','class':'" + ExecutePlanAction.class.getName() + "'}," +
+ "{'name' : 'recorder', 'class': '" + ContextPropertiesRecorderAction.class.getName() + "'}" +
+ "]}}";
+ req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+ response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ assertTrue("ScheduledTrigger did not fire within 20 seconds", triggerFiredLatch.await(20, TimeUnit.SECONDS));
+ assertEquals(1, events.size());
+ Map<String, Object> actionContextProps = actionContextPropertiesRef.get();
+ assertNotNull(actionContextProps);
+ TriggerEvent event = events.iterator().next();
+ List<SolrRequest> operations = (List<SolrRequest>) actionContextProps.get("operations");
+ assertNotNull(operations);
+ assertEquals(1, operations.size());
+ for (SolrRequest operation : operations) {
+ SolrParams params = operation.getParams();
+ assertEquals(newNode.getNodeName(), params.get("targetNode"));
+ }
+ }
+
+ public static class ContextPropertiesRecorderAction extends TriggerActionBase {
+ @Override
+ public void process(TriggerEvent event, ActionContext actionContext) {
+ actionContextPropertiesRef.set(actionContext.getProperties());
+ try {
+ events.add(event);
+ triggerFiredLatch.countDown();
+ } catch (Throwable t) {
+ log.debug("--throwable", t);
+ throw t;
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8533a46c/solr/core/src/test/org/apache/solr/cloud/autoscaling/SearchRateTriggerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/SearchRateTriggerIntegrationTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/SearchRateTriggerIntegrationTest.java
new file mode 100644
index 0000000..547be5c
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/SearchRateTriggerIntegrationTest.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.autoscaling;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.util.concurrent.AtomicDouble;
+import org.apache.lucene.util.LuceneTestCase;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.cloud.SolrCloudManager;
+import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
+import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
+import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventProcessorStage;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.util.LogLevel;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.cloud.autoscaling.AutoScalingHandlerTest.createAutoScalingRequest;
+import static org.apache.solr.cloud.autoscaling.TriggerIntegrationTest.WAIT_FOR_DELTA_NANOS;
+import static org.apache.solr.cloud.autoscaling.TriggerIntegrationTest.timeSource;
+
+/**
+ * Integration test for {@link SearchRateTrigger}
+ */
+@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG;org.apache.solr.client.solrj.cloud.autoscaling=DEBUG")
+@LuceneTestCase.BadApple(bugUrl = "https://issues.apache.org/jira/browse/SOLR-12028")
+public class SearchRateTriggerIntegrationTest extends SolrCloudTestCase {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private static CountDownLatch triggerFiredLatch = new CountDownLatch(1);
+ private static CountDownLatch listenerCreated = new CountDownLatch(1);
+ private static int waitForSeconds = 1;
+ private static Set<TriggerEvent> events = ConcurrentHashMap.newKeySet();
+ private static Map<String, List<CapturedEvent>> listenerEvents = new HashMap<>();
+
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ configureCluster(5)
+ .addConfig("conf", configset("cloud-minimal"))
+ .configure();
+ // disable .scheduled_maintenance
+ String suspendTriggerCommand = "{" +
+ "'suspend-trigger' : {'name' : '.scheduled_maintenance'}" +
+ "}";
+ SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, suspendTriggerCommand);
+ SolrClient solrClient = cluster.getSolrClient();
+ NamedList<Object> response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+ }
+
+ @Test
+ public void testSearchRate() throws Exception {
+ CloudSolrClient solrClient = cluster.getSolrClient();
+ String COLL1 = "collection1";
+ CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(COLL1,
+ "conf", 1, 2);
+ create.process(solrClient);
+ String setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'search_rate_trigger'," +
+ "'event' : 'searchRate'," +
+ "'waitFor' : '" + waitForSeconds + "s'," +
+ "'enabled' : true," +
+ "'rate' : 1.0," +
+ "'actions' : [" +
+ "{'name':'compute','class':'" + ComputePlanAction.class.getName() + "'}," +
+ "{'name':'execute','class':'" + ExecutePlanAction.class.getName() + "'}," +
+ "{'name':'test','class':'" + TestSearchRateAction.class.getName() + "'}" +
+ "]" +
+ "}}";
+ SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+ NamedList<Object> response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ String setListenerCommand1 = "{" +
+ "'set-listener' : " +
+ "{" +
+ "'name' : 'srt'," +
+ "'trigger' : 'search_rate_trigger'," +
+ "'stage' : ['FAILED','SUCCEEDED']," +
+ "'afterAction': ['compute', 'execute', 'test']," +
+ "'class' : '" + TestTriggerListener.class.getName() + "'" +
+ "}" +
+ "}";
+ req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand1);
+ response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+ SolrParams query = params(CommonParams.Q, "*:*");
+ for (int i = 0; i < 500; i++) {
+ solrClient.query(COLL1, query);
+ }
+ boolean await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
+ assertTrue("The trigger did not fire at all", await);
+ // wait for listener to capture the SUCCEEDED stage
+ Thread.sleep(5000);
+ List<CapturedEvent> events = listenerEvents.get("srt");
+ assertEquals(listenerEvents.toString(), 4, events.size());
+ assertEquals("AFTER_ACTION", events.get(0).stage.toString());
+ assertEquals("compute", events.get(0).actionName);
+ assertEquals("AFTER_ACTION", events.get(1).stage.toString());
+ assertEquals("execute", events.get(1).actionName);
+ assertEquals("AFTER_ACTION", events.get(2).stage.toString());
+ assertEquals("test", events.get(2).actionName);
+ assertEquals("SUCCEEDED", events.get(3).stage.toString());
+ assertNull(events.get(3).actionName);
+
+ CapturedEvent ev = events.get(0);
+ long now = timeSource.getTimeNs();
+ // verify waitFor
+ assertTrue(TimeUnit.SECONDS.convert(waitForSeconds, TimeUnit.NANOSECONDS) - WAIT_FOR_DELTA_NANOS <= now - ev.event.getEventTime());
+ Map<String, Double> nodeRates = (Map<String, Double>) ev.event.getProperties().get("node");
+ assertNotNull("nodeRates", nodeRates);
+ assertTrue(nodeRates.toString(), nodeRates.size() > 0);
+ AtomicDouble totalNodeRate = new AtomicDouble();
+ nodeRates.forEach((n, r) -> totalNodeRate.addAndGet(r));
+ List<ReplicaInfo> replicaRates = (List<ReplicaInfo>) ev.event.getProperties().get("replica");
+ assertNotNull("replicaRates", replicaRates);
+ assertTrue(replicaRates.toString(), replicaRates.size() > 0);
+ AtomicDouble totalReplicaRate = new AtomicDouble();
+ replicaRates.forEach(r -> {
+ assertTrue(r.toString(), r.getVariable("rate") != null);
+ totalReplicaRate.addAndGet((Double) r.getVariable("rate"));
+ });
+ Map<String, Object> shardRates = (Map<String, Object>) ev.event.getProperties().get("shard");
+ assertNotNull("shardRates", shardRates);
+ assertEquals(shardRates.toString(), 1, shardRates.size());
+ shardRates = (Map<String, Object>) shardRates.get(COLL1);
+ assertNotNull("shardRates", shardRates);
+ assertEquals(shardRates.toString(), 1, shardRates.size());
+ AtomicDouble totalShardRate = new AtomicDouble();
+ shardRates.forEach((s, r) -> totalShardRate.addAndGet((Double) r));
+ Map<String, Double> collectionRates = (Map<String, Double>) ev.event.getProperties().get("collection");
+ assertNotNull("collectionRates", collectionRates);
+ assertEquals(collectionRates.toString(), 1, collectionRates.size());
+ Double collectionRate = collectionRates.get(COLL1);
+ assertNotNull(collectionRate);
+ assertTrue(collectionRate > 5.0);
+ assertEquals(collectionRate, totalNodeRate.get(), 5.0);
+ assertEquals(collectionRate, totalShardRate.get(), 5.0);
+ assertEquals(collectionRate, totalReplicaRate.get(), 5.0);
+
+ // check operations
+ List<Map<String, Object>> ops = (List<Map<String, Object>>) ev.context.get("properties.operations");
+ assertNotNull(ops);
+ assertTrue(ops.size() > 1);
+ for (Map<String, Object> m : ops) {
+ assertEquals("ADDREPLICA", m.get("params.action"));
+ }
+ }
+
+ public static class TestSearchRateAction extends TriggerActionBase {
+
+ @Override
+ public void process(TriggerEvent event, ActionContext context) throws Exception {
+ try {
+ events.add(event);
+ long currentTimeNanos = timeSource.getTimeNs();
+ long eventTimeNanos = event.getEventTime();
+ long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
+ if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
+ fail(event.source + " was fired before the configured waitFor period");
+ }
+ triggerFiredLatch.countDown();
+ } catch (Throwable t) {
+ log.debug("--throwable", t);
+ throw t;
+ }
+ }
+ }
+
+ public static class TestTriggerListener extends TriggerListenerBase {
+ @Override
+ public void init(SolrCloudManager cloudManager, AutoScalingConfig.TriggerListenerConfig config) {
+ super.init(cloudManager, config);
+ listenerCreated.countDown();
+ }
+
+ @Override
+ public synchronized void onEvent(TriggerEvent event, TriggerEventProcessorStage stage, String actionName,
+ ActionContext context, Throwable error, String message) {
+ List<CapturedEvent> lst = listenerEvents.computeIfAbsent(config.name, s -> new ArrayList<>());
+ lst.add(new CapturedEvent(timeSource.getTimeNs(), context, config, stage, actionName, event, message));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8533a46c/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerCooldownIntegrationTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerCooldownIntegrationTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerCooldownIntegrationTest.java
new file mode 100644
index 0000000..8d69bad
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerCooldownIntegrationTest.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.autoscaling;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.cloud.SolrCloudManager;
+import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
+import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventProcessorStage;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.params.AutoScalingParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.util.LogLevel;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.cloud.autoscaling.AutoScalingHandlerTest.createAutoScalingRequest;
+import static org.apache.solr.cloud.autoscaling.TriggerIntegrationTest.WAIT_FOR_DELTA_NANOS;
+import static org.apache.solr.cloud.autoscaling.TriggerIntegrationTest.timeSource;
+
+@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG;org.apache.solr.client.solrj.cloud.autoscaling=DEBUG")
+public class TriggerCooldownIntegrationTest extends SolrCloudTestCase {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ static Map<String, List<CapturedEvent>> listenerEvents = new HashMap<>();
+ static CountDownLatch listenerCreated = new CountDownLatch(1);
+ static boolean failDummyAction = false;
+ private static CountDownLatch actionConstructorCalled = new CountDownLatch(1);
+ private static CountDownLatch actionInitCalled = new CountDownLatch(1);
+ private static CountDownLatch triggerFiredLatch = new CountDownLatch(1);
+ private static int waitForSeconds = 1;
+ private static AtomicBoolean triggerFired = new AtomicBoolean();
+ private static Set<TriggerEvent> events = ConcurrentHashMap.newKeySet();
+
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ configureCluster(2)
+ .addConfig("conf", configset("cloud-minimal"))
+ .configure();
+ // disable .scheduled_maintenance
+ String suspendTriggerCommand = "{" +
+ "'suspend-trigger' : {'name' : '.scheduled_maintenance'}" +
+ "}";
+ SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, suspendTriggerCommand);
+ SolrClient solrClient = cluster.getSolrClient();
+ NamedList<Object> response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+ }
+
+ @Test
+ public void testCooldown() throws Exception {
+ CloudSolrClient solrClient = cluster.getSolrClient();
+ failDummyAction = false;
+ waitForSeconds = 1;
+ String setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'node_added_cooldown_trigger'," +
+ "'event' : 'nodeAdded'," +
+ "'waitFor' : '" + waitForSeconds + "s'," +
+ "'enabled' : true," +
+ "'actions' : [" +
+ "{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}" +
+ "]" +
+ "}}";
+ SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+ NamedList<Object> response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ String setListenerCommand1 = "{" +
+ "'set-listener' : " +
+ "{" +
+ "'name' : 'bar'," +
+ "'trigger' : 'node_added_cooldown_trigger'," +
+ "'stage' : ['FAILED','SUCCEEDED', 'IGNORED']," +
+ "'class' : '" + TestTriggerListener.class.getName() + "'" +
+ "}" +
+ "}";
+ req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand1);
+ response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ listenerCreated = new CountDownLatch(1);
+ listenerEvents.clear();
+
+ JettySolrRunner newNode = cluster.startJettySolrRunner();
+ boolean await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
+ assertTrue("The trigger did not fire at all", await);
+ assertTrue(triggerFired.get());
+ // wait for listener to capture the SUCCEEDED stage
+ Thread.sleep(1000);
+
+ List<CapturedEvent> capturedEvents = listenerEvents.get("bar");
+ // we may get a few IGNORED events if other tests caused events within cooldown period
+ assertTrue(capturedEvents.toString(), capturedEvents.size() > 0);
+ long prevTimestamp = capturedEvents.get(capturedEvents.size() - 1).timestamp;
+
+ // reset the trigger and captured events
+ listenerEvents.clear();
+ triggerFiredLatch = new CountDownLatch(1);
+ triggerFired.compareAndSet(true, false);
+
+ JettySolrRunner newNode2 = cluster.startJettySolrRunner();
+ await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
+ assertTrue("The trigger did not fire at all", await);
+ // wait for listener to capture the SUCCEEDED stage
+ Thread.sleep(2000);
+
+ // there must be at least one IGNORED event due to cooldown, and one SUCCEEDED event
+ capturedEvents = listenerEvents.get("bar");
+ assertEquals(capturedEvents.toString(), 1, capturedEvents.size());
+ CapturedEvent ev = capturedEvents.get(0);
+ assertEquals(ev.toString(), TriggerEventProcessorStage.SUCCEEDED, ev.stage);
+ // the difference between timestamps of the first SUCCEEDED and the last SUCCEEDED
+ // must be larger than cooldown period
+ assertTrue("timestamp delta is less than default cooldown period", ev.timestamp - prevTimestamp > TimeUnit.SECONDS.toNanos(ScheduledTriggers.DEFAULT_COOLDOWN_PERIOD_SECONDS));
+ prevTimestamp = ev.timestamp;
+
+ // this also resets the cooldown period
+ long modifiedCooldownPeriodSeconds = 7;
+ String setPropertiesCommand = "{\n" +
+ "\t\"set-properties\" : {\n" +
+ "\t\t\"" + AutoScalingParams.TRIGGER_COOLDOWN_PERIOD_SECONDS + "\" : " + modifiedCooldownPeriodSeconds + "\n" +
+ "\t}\n" +
+ "}";
+ solrClient.request(createAutoScalingRequest(SolrRequest.METHOD.POST, setPropertiesCommand));
+ req = createAutoScalingRequest(SolrRequest.METHOD.GET, null);
+ response = solrClient.request(req);
+
+ // reset the trigger and captured events
+ listenerEvents.clear();
+ triggerFiredLatch = new CountDownLatch(1);
+ triggerFired.compareAndSet(true, false);
+
+ JettySolrRunner newNode3 = cluster.startJettySolrRunner();
+ await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
+ assertTrue("The trigger did not fire at all", await);
+ triggerFiredLatch = new CountDownLatch(1);
+ triggerFired.compareAndSet(true, false);
+ // add another node
+ JettySolrRunner newNode4 = cluster.startJettySolrRunner();
+ await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
+ assertTrue("The trigger did not fire at all", await);
+ // wait for listener to capture the SUCCEEDED stage
+ Thread.sleep(2000);
+
+ // there must be two SUCCEEDED (due to newNode3 and newNode4) and maybe some ignored events
+ capturedEvents = listenerEvents.get("bar");
+ assertTrue(capturedEvents.toString(), capturedEvents.size() >= 2);
+ // first event should be SUCCEEDED
+ ev = capturedEvents.get(0);
+ assertEquals(ev.toString(), TriggerEventProcessorStage.SUCCEEDED, ev.stage);
+
+ ev = capturedEvents.get(capturedEvents.size() - 1);
+ assertEquals(ev.toString(), TriggerEventProcessorStage.SUCCEEDED, ev.stage);
+ // the difference between timestamps of the first SUCCEEDED and the last SUCCEEDED
+ // must be larger than the modified cooldown period
+ assertTrue("timestamp delta is less than default cooldown period", ev.timestamp - prevTimestamp > TimeUnit.SECONDS.toNanos(modifiedCooldownPeriodSeconds));
+ }
+
+ public static class TestTriggerAction extends TriggerActionBase {
+
+ public TestTriggerAction() {
+ actionConstructorCalled.countDown();
+ }
+
+ @Override
+ public void process(TriggerEvent event, ActionContext actionContext) {
+ try {
+ if (triggerFired.compareAndSet(false, true)) {
+ events.add(event);
+ long currentTimeNanos = timeSource.getTimeNs();
+ long eventTimeNanos = event.getEventTime();
+ long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
+ if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
+ fail(event.source + " was fired before the configured waitFor period");
+ }
+ triggerFiredLatch.countDown();
+ } else {
+ fail(event.source + " was fired more than once!");
+ }
+ } catch (Throwable t) {
+ log.debug("--throwable", t);
+ throw t;
+ }
+ }
+
+ @Override
+ public void init(Map<String, String> args) {
+ log.info("TestTriggerAction init");
+ actionInitCalled.countDown();
+ super.init(args);
+ }
+ }
+
+ public static class TestTriggerListener extends TriggerListenerBase {
+ @Override
+ public void init(SolrCloudManager cloudManager, AutoScalingConfig.TriggerListenerConfig config) {
+ super.init(cloudManager, config);
+ listenerCreated.countDown();
+ }
+
+ @Override
+ public synchronized void onEvent(TriggerEvent event, TriggerEventProcessorStage stage, String actionName,
+ ActionContext context, Throwable error, String message) {
+ List<CapturedEvent> lst = listenerEvents.computeIfAbsent(config.name, s -> new ArrayList<>());
+ lst.add(new CapturedEvent(timeSource.getTimeNs(), context, config, stage, actionName, event, message));
+ }
+ }
+}