You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by sh...@apache.org on 2018/03/30 14:03:52 UTC
[1/6] lucene-solr:branch_7x: SOLR-12152: Split up
TriggerIntegrationTest into multiple tests to isolate and increase
reliability
Repository: lucene-solr
Updated Branches:
refs/heads/branch_7x 12106f000 -> 83f77bcb4
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8533a46c/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
index 15f65e7..f536633 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
@@ -19,52 +19,34 @@ package org.apache.solr.cloud.autoscaling;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Date;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.SortedSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
-import com.google.common.util.concurrent.AtomicDouble;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrRequest;
-import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
-import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
+import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventProcessorStage;
-import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.response.CollectionAdminResponse;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.SolrCloudTestCase;
-import org.apache.solr.common.SolrInputDocument;
-import org.apache.solr.common.cloud.DocCollection;
-import org.apache.solr.common.cloud.LiveNodesListener;
-import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.params.AutoScalingParams;
-import org.apache.solr.common.params.CommonParams;
-import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.TimeSource;
import org.apache.solr.common.util.Utils;
-import org.apache.solr.core.SolrResourceLoader;
-import org.apache.solr.metrics.SolrCoreMetricManager;
import org.apache.solr.util.LogLevel;
-import org.apache.solr.util.TimeOut;
import org.apache.zookeeper.data.Stat;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -73,7 +55,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.solr.cloud.autoscaling.AutoScalingHandlerTest.createAutoScalingRequest;
-import static org.apache.solr.cloud.autoscaling.ScheduledTriggers.DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS;
import static org.apache.solr.common.cloud.ZkStateReader.SOLR_AUTOSCALING_CONF_PATH;
/**
@@ -92,20 +73,18 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
private static CountDownLatch actionCompleted;
private static AtomicBoolean triggerFired;
private static Set<TriggerEvent> events = ConcurrentHashMap.newKeySet();
- private static ZkStateReader zkStateReader;
private static SolrCloudManager cloudManager;
// use the same time source as triggers use
- private static final TimeSource timeSource = TimeSource.CURRENT_TIME;
+ static final TimeSource timeSource = TimeSource.CURRENT_TIME;
- private static final long WAIT_FOR_DELTA_NANOS = TimeUnit.MILLISECONDS.toNanos(5);
+ static final long WAIT_FOR_DELTA_NANOS = TimeUnit.MILLISECONDS.toNanos(5);
@BeforeClass
public static void setupCluster() throws Exception {
configureCluster(2)
.addConfig("conf", configset("cloud-minimal"))
.configure();
- zkStateReader = cluster.getSolrClient().getZkStateReader();
// disable .scheduled_maintenance
String suspendTriggerCommand = "{" +
"'suspend-trigger' : {'name' : '.scheduled_maintenance'}" +
@@ -344,250 +323,6 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
}
@Test
- public void testNodeLostTriggerRestoreState() throws Exception {
- // for this test we want to update the trigger so we must assert that the actions were created twice
- TriggerIntegrationTest.actionInitCalled = new CountDownLatch(2);
-
- // start a new node
- JettySolrRunner newNode = cluster.startJettySolrRunner();
- String nodeName = newNode.getNodeName();
-
- CloudSolrClient solrClient = cluster.getSolrClient();
- waitForSeconds = 5;
- String setTriggerCommand = "{" +
- "'set-trigger' : {" +
- "'name' : 'node_lost_restore_trigger'," +
- "'event' : 'nodeLost'," +
- "'waitFor' : '5s'," + // should be enough for us to update the trigger
- "'enabled' : true," +
- "'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
- "}}";
- SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
- NamedList<Object> response = solrClient.request(req);
- assertEquals(response.get("result").toString(), "success");
-
- TimeOut timeOut = new TimeOut(2, TimeUnit.SECONDS, cloudManager.getTimeSource());
- while (actionInitCalled.getCount() == 0 && !timeOut.hasTimedOut()) {
- Thread.sleep(200);
- }
- assertTrue("The action specified in node_lost_restore_trigger was not instantiated even after 2 seconds", actionInitCalled.getCount() > 0);
-
- List<JettySolrRunner> jettySolrRunners = cluster.getJettySolrRunners();
- int index = -1;
- for (int i = 0; i < jettySolrRunners.size(); i++) {
- JettySolrRunner runner = jettySolrRunners.get(i);
- if (runner == newNode) index = i;
- }
- assertFalse(index == -1);
- cluster.stopJettySolrRunner(index);
-
- // ensure that the old trigger sees the stopped node, todo find a better way to do this
- Thread.sleep(500 + TimeUnit.SECONDS.toMillis(DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS));
-
- waitForSeconds = 0;
- setTriggerCommand = "{" +
- "'set-trigger' : {" +
- "'name' : 'node_lost_restore_trigger'," +
- "'event' : 'nodeLost'," +
- "'waitFor' : '0s'," + // update a property so that it replaces the old trigger, also we want it to fire immediately
- "'enabled' : true," +
- "'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
- "}}";
- req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
- response = solrClient.request(req);
- assertEquals(response.get("result").toString(), "success");
-
- // wait until the second instance of action is created
- if (!actionInitCalled.await(3, TimeUnit.SECONDS)) {
- fail("Two TriggerAction instances should have been created by now");
- }
-
- boolean await = triggerFiredLatch.await(5, TimeUnit.SECONDS);
- assertTrue("The trigger did not fire at all", await);
- assertTrue(triggerFired.get());
- NodeLostTrigger.NodeLostEvent nodeLostEvent = (NodeLostTrigger.NodeLostEvent) events.iterator().next();
- assertNotNull(nodeLostEvent);
- List<String> nodeNames = (List<String>)nodeLostEvent.getProperty(TriggerEvent.NODE_NAMES);
- assertTrue(nodeNames.contains(nodeName));
- }
-
- @Test
- public void testNodeAddedTriggerRestoreState() throws Exception {
- // for this test we want to update the trigger so we must assert that the actions were created twice
- TriggerIntegrationTest.actionInitCalled = new CountDownLatch(2);
-
- CloudSolrClient solrClient = cluster.getSolrClient();
- waitForSeconds = 5;
- String setTriggerCommand = "{" +
- "'set-trigger' : {" +
- "'name' : 'node_added_restore_trigger'," +
- "'event' : 'nodeAdded'," +
- "'waitFor' : '5s'," + // should be enough for us to update the trigger
- "'enabled' : true," +
- "'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
- "}}";
- SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
- NamedList<Object> response = solrClient.request(req);
- assertEquals(response.get("result").toString(), "success");
-
- TimeOut timeOut = new TimeOut(2, TimeUnit.SECONDS, cloudManager.getTimeSource());
- while (actionInitCalled.getCount() == 0 && !timeOut.hasTimedOut()) {
- Thread.sleep(200);
- }
- assertTrue("The action specified in node_added_restore_trigger was not instantiated even after 2 seconds", actionInitCalled.getCount() > 0);
-
- // start a new node
- JettySolrRunner newNode = cluster.startJettySolrRunner();
-
- // ensure that the old trigger sees the new node, todo find a better way to do this
- Thread.sleep(500 + TimeUnit.SECONDS.toMillis(DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS));
-
- waitForSeconds = 0;
- setTriggerCommand = "{" +
- "'set-trigger' : {" +
- "'name' : 'node_added_restore_trigger'," +
- "'event' : 'nodeAdded'," +
- "'waitFor' : '0s'," + // update a property so that it replaces the old trigger, also we want it to fire immediately
- "'enabled' : true," +
- "'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
- "}}";
- req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
- response = solrClient.request(req);
- assertEquals(response.get("result").toString(), "success");
-
- // wait until the second instance of action is created
- if (!actionInitCalled.await(3, TimeUnit.SECONDS)) {
- fail("Two TriggerAction instances should have been created by now");
- }
-
- boolean await = triggerFiredLatch.await(5, TimeUnit.SECONDS);
- assertTrue("The trigger did not fire at all", await);
- assertTrue(triggerFired.get());
- NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = (NodeAddedTrigger.NodeAddedEvent) events.iterator().next();
- assertNotNull(nodeAddedEvent);
- List<String> nodeNames = (List<String>)nodeAddedEvent.getProperty(TriggerEvent.NODE_NAMES);
- assertTrue(nodeNames.contains(newNode.getNodeName()));
- }
-
- @Test
- public void testNodeAddedTrigger() throws Exception {
- CloudSolrClient solrClient = cluster.getSolrClient();
- String setTriggerCommand = "{" +
- "'set-trigger' : {" +
- "'name' : 'node_added_trigger'," +
- "'event' : 'nodeAdded'," +
- "'waitFor' : '" + waitForSeconds + "s'," +
- "'enabled' : true," +
- "'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
- "}}";
- SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
- NamedList<Object> response = solrClient.request(req);
- assertEquals(response.get("result").toString(), "success");
-
- if (!actionInitCalled.await(3, TimeUnit.SECONDS)) {
- fail("The TriggerAction should have been created by now");
- }
-
- JettySolrRunner newNode = cluster.startJettySolrRunner();
- boolean await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
- assertTrue("The trigger did not fire at all", await);
- assertTrue(triggerFired.get());
- NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = (NodeAddedTrigger.NodeAddedEvent) events.iterator().next();
- assertNotNull(nodeAddedEvent);
- List<String> nodeNames = (List<String>)nodeAddedEvent.getProperty(TriggerEvent.NODE_NAMES);
- assertTrue(nodeNames.contains(newNode.getNodeName()));
-
- // reset
- actionConstructorCalled = new CountDownLatch(1);
- actionInitCalled = new CountDownLatch(1);
-
- // update the trigger with exactly the same data
- setTriggerCommand = "{" +
- "'set-trigger' : {" +
- "'name' : 'node_added_trigger'," +
- "'event' : 'nodeAdded'," +
- "'waitFor' : '" + waitForSeconds + "s'," +
- "'enabled' : true," +
- "'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
- "}}";
- req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
- response = solrClient.request(req);
- assertEquals(response.get("result").toString(), "success");
-
- // this should be a no-op so the action should have been created but init should not be called
- if (!actionConstructorCalled.await(3, TimeUnit.SECONDS)) {
- fail("The TriggerAction should have been created by now");
- }
-
- assertFalse(actionInitCalled.await(2, TimeUnit.SECONDS));
- }
-
- @Test
- public void testNodeLostTrigger() throws Exception {
- CloudSolrClient solrClient = cluster.getSolrClient();
- String setTriggerCommand = "{" +
- "'set-trigger' : {" +
- "'name' : 'node_lost_trigger'," +
- "'event' : 'nodeLost'," +
- "'waitFor' : '" + waitForSeconds + "s'," +
- "'enabled' : true," +
- "'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
- "}}";
- NamedList<Object> overSeerStatus = cluster.getSolrClient().request(CollectionAdminRequest.getOverseerStatus());
- String overseerLeader = (String) overSeerStatus.get("leader");
- int nonOverseerLeaderIndex = 0;
- for (int i = 0; i < cluster.getJettySolrRunners().size(); i++) {
- JettySolrRunner jetty = cluster.getJettySolrRunner(i);
- if (!jetty.getNodeName().equals(overseerLeader)) {
- nonOverseerLeaderIndex = i;
- }
- }
- SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
- NamedList<Object> response = solrClient.request(req);
- assertEquals(response.get("result").toString(), "success");
-
- if (!actionInitCalled.await(3, TimeUnit.SECONDS)) {
- fail("The TriggerAction should have been created by now");
- }
-
- triggerFired.set(false);
- triggerFiredLatch = new CountDownLatch(1);
- String lostNodeName = cluster.getJettySolrRunner(nonOverseerLeaderIndex).getNodeName();
- cluster.stopJettySolrRunner(nonOverseerLeaderIndex);
- boolean await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
- assertTrue("The trigger did not fire at all", await);
- assertTrue(triggerFired.get());
- NodeLostTrigger.NodeLostEvent nodeLostEvent = (NodeLostTrigger.NodeLostEvent) events.iterator().next();
- assertNotNull(nodeLostEvent);
- List<String> nodeNames = (List<String>)nodeLostEvent.getProperty(TriggerEvent.NODE_NAMES);
- assertTrue(nodeNames.contains(lostNodeName));
-
- // reset
- actionConstructorCalled = new CountDownLatch(1);
- actionInitCalled = new CountDownLatch(1);
-
- // update the trigger with exactly the same data
- setTriggerCommand = "{" +
- "'set-trigger' : {" +
- "'name' : 'node_lost_trigger'," +
- "'event' : 'nodeLost'," +
- "'waitFor' : '" + waitForSeconds + "s'," +
- "'enabled' : true," +
- "'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
- "}}";
- req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
- response = solrClient.request(req);
- assertEquals(response.get("result").toString(), "success");
-
- // this should be a no-op so the action should have been created but init should not be called
- if (!actionConstructorCalled.await(3, TimeUnit.SECONDS)) {
- fail("The TriggerAction should have been created by now");
- }
-
- assertFalse(actionInitCalled.await(2, TimeUnit.SECONDS));
- }
-
- @Test
public void testContinueTriggersOnOverseerRestart() throws Exception {
CollectionAdminRequest.OverseerStatus status = new CollectionAdminRequest.OverseerStatus();
CloudSolrClient solrClient = cluster.getSolrClient();
@@ -814,194 +549,6 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
assertTrue(triggerFired.get());
}
- private static class TestLiveNodesListener implements LiveNodesListener {
- Set<String> lostNodes = new HashSet<>();
- Set<String> addedNodes = new HashSet<>();
- CountDownLatch onChangeLatch = new CountDownLatch(1);
-
- public void reset() {
- lostNodes.clear();
- addedNodes.clear();
- onChangeLatch = new CountDownLatch(1);
- }
-
- @Override
- public void onChange(SortedSet<String> oldLiveNodes, SortedSet<String> newLiveNodes) {
- onChangeLatch.countDown();
- Set<String> old = new HashSet<>(oldLiveNodes);
- old.removeAll(newLiveNodes);
- if (!old.isEmpty()) {
- lostNodes.addAll(old);
- }
- newLiveNodes.removeAll(oldLiveNodes);
- if (!newLiveNodes.isEmpty()) {
- addedNodes.addAll(newLiveNodes);
- }
- }
- }
-
- private TestLiveNodesListener registerLiveNodesListener() {
- TestLiveNodesListener listener = new TestLiveNodesListener();
- zkStateReader.registerLiveNodesListener(listener);
- return listener;
- }
-
- public static class TestEventMarkerAction extends TriggerActionBase {
-
- public TestEventMarkerAction() {
- actionConstructorCalled.countDown();
- }
-
- @Override
- public void process(TriggerEvent event, ActionContext actionContext) {
- boolean locked = lock.tryLock();
- if (!locked) {
- log.info("We should never have a tryLock fail because actions are never supposed to be executed concurrently");
- return;
- }
- try {
- events.add(event);
- getTriggerFiredLatch().countDown();
- } catch (Throwable t) {
- log.debug("--throwable", t);
- throw t;
- } finally {
- lock.unlock();
- }
- }
-
- @Override
- public void init(Map<String, String> args) {
- log.info("TestEventMarkerAction init");
- actionInitCalled.countDown();
- super.init(args);
- }
- }
-
- @Test
- @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028")
- public void testNodeMarkersRegistration() throws Exception {
- // for this test we want to create two triggers so we must assert that the actions were created twice
- actionInitCalled = new CountDownLatch(2);
- // similarly we want both triggers to fire
- triggerFiredLatch = new CountDownLatch(2);
- TestLiveNodesListener listener = registerLiveNodesListener();
-
- NamedList<Object> overSeerStatus = cluster.getSolrClient().request(CollectionAdminRequest.getOverseerStatus());
- String overseerLeader = (String) overSeerStatus.get("leader");
- int overseerLeaderIndex = 0;
- for (int i = 0; i < cluster.getJettySolrRunners().size(); i++) {
- JettySolrRunner jetty = cluster.getJettySolrRunner(i);
- if (jetty.getNodeName().equals(overseerLeader)) {
- overseerLeaderIndex = i;
- break;
- }
- }
- // add a node
- JettySolrRunner node = cluster.startJettySolrRunner();
- if (!listener.onChangeLatch.await(10, TimeUnit.SECONDS)) {
- fail("onChange listener didn't execute on cluster change");
- }
- assertEquals(1, listener.addedNodes.size());
- assertEquals(node.getNodeName(), listener.addedNodes.iterator().next());
- // verify that a znode doesn't exist (no trigger)
- String pathAdded = ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + node.getNodeName();
- assertFalse("Path " + pathAdded + " was created but there are no nodeAdded triggers", zkClient().exists(pathAdded, true));
- listener.reset();
- // stop overseer
- log.info("====== KILL OVERSEER 1");
- cluster.stopJettySolrRunner(overseerLeaderIndex);
- if (!listener.onChangeLatch.await(10, TimeUnit.SECONDS)) {
- fail("onChange listener didn't execute on cluster change");
- }
- assertEquals(1, listener.lostNodes.size());
- assertEquals(overseerLeader, listener.lostNodes.iterator().next());
- assertEquals(0, listener.addedNodes.size());
- // wait until the new overseer is up
- Thread.sleep(5000);
- // verify that a znode does NOT exist - there's no nodeLost trigger,
- // so the new overseer cleaned up existing nodeLost markers
- String pathLost = ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH + "/" + overseerLeader;
- assertFalse("Path " + pathLost + " exists", zkClient().exists(pathLost, true));
-
- listener.reset();
-
- // set up triggers
- CloudSolrClient solrClient = cluster.getSolrClient();
-
- log.info("====== ADD TRIGGERS");
- String setTriggerCommand = "{" +
- "'set-trigger' : {" +
- "'name' : 'node_added_triggerMR'," +
- "'event' : 'nodeAdded'," +
- "'waitFor' : '1s'," +
- "'enabled' : true," +
- "'actions' : [{'name':'test','class':'" + TestEventMarkerAction.class.getName() + "'}]" +
- "}}";
- SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
- NamedList<Object> response = solrClient.request(req);
- assertEquals(response.get("result").toString(), "success");
-
- setTriggerCommand = "{" +
- "'set-trigger' : {" +
- "'name' : 'node_lost_triggerMR'," +
- "'event' : 'nodeLost'," +
- "'waitFor' : '1s'," +
- "'enabled' : true," +
- "'actions' : [{'name':'test','class':'" + TestEventMarkerAction.class.getName() + "'}]" +
- "}}";
- req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
- response = solrClient.request(req);
- assertEquals(response.get("result").toString(), "success");
-
- overSeerStatus = cluster.getSolrClient().request(CollectionAdminRequest.getOverseerStatus());
- overseerLeader = (String) overSeerStatus.get("leader");
- overseerLeaderIndex = 0;
- for (int i = 0; i < cluster.getJettySolrRunners().size(); i++) {
- JettySolrRunner jetty = cluster.getJettySolrRunner(i);
- if (jetty.getNodeName().equals(overseerLeader)) {
- overseerLeaderIndex = i;
- break;
- }
- }
-
- // create another node
- log.info("====== ADD NODE 1");
- JettySolrRunner node1 = cluster.startJettySolrRunner();
- if (!listener.onChangeLatch.await(10, TimeUnit.SECONDS)) {
- fail("onChange listener didn't execute on cluster change");
- }
- assertEquals(1, listener.addedNodes.size());
- assertEquals(node1.getNodeName(), listener.addedNodes.iterator().next());
- // verify that a znode exists
- pathAdded = ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + node1.getNodeName();
- assertTrue("Path " + pathAdded + " wasn't created", zkClient().exists(pathAdded, true));
-
- Thread.sleep(5000);
- // nodeAdded marker should be consumed now by nodeAdded trigger
- assertFalse("Path " + pathAdded + " should have been deleted", zkClient().exists(pathAdded, true));
-
- listener.reset();
- events.clear();
- triggerFiredLatch = new CountDownLatch(1);
- // kill overseer again
- log.info("====== KILL OVERSEER 2");
- cluster.stopJettySolrRunner(overseerLeaderIndex);
- if (!listener.onChangeLatch.await(10, TimeUnit.SECONDS)) {
- fail("onChange listener didn't execute on cluster change");
- }
-
-
- if (!triggerFiredLatch.await(20, TimeUnit.SECONDS)) {
- fail("Trigger should have fired by now");
- }
- assertEquals(1, events.size());
- TriggerEvent ev = events.iterator().next();
- List<String> nodeNames = (List<String>)ev.getProperty(TriggerEvent.NODE_NAMES);
- assertTrue(nodeNames.contains(overseerLeader));
- assertEquals(TriggerEventType.NODELOST, ev.getEventType());
- }
-
static Map<String, List<CapturedEvent>> listenerEvents = new HashMap<>();
static CountDownLatch listenerCreated = new CountDownLatch(1);
static boolean failDummyAction = false;
@@ -1175,551 +722,4 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
assertEquals(TriggerEventProcessorStage.FAILED, capturedEvents.get(3).stage);
assertEquals("test1", capturedEvents.get(3).actionName);
}
-
- @Test
- public void testCooldown() throws Exception {
- CloudSolrClient solrClient = cluster.getSolrClient();
- failDummyAction = false;
- waitForSeconds = 1;
- String setTriggerCommand = "{" +
- "'set-trigger' : {" +
- "'name' : 'node_added_cooldown_trigger'," +
- "'event' : 'nodeAdded'," +
- "'waitFor' : '" + waitForSeconds + "s'," +
- "'enabled' : true," +
- "'actions' : [" +
- "{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}" +
- "]" +
- "}}";
- SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
- NamedList<Object> response = solrClient.request(req);
- assertEquals(response.get("result").toString(), "success");
-
- String setListenerCommand1 = "{" +
- "'set-listener' : " +
- "{" +
- "'name' : 'bar'," +
- "'trigger' : 'node_added_cooldown_trigger'," +
- "'stage' : ['FAILED','SUCCEEDED', 'IGNORED']," +
- "'class' : '" + TestTriggerListener.class.getName() + "'" +
- "}" +
- "}";
- req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand1);
- response = solrClient.request(req);
- assertEquals(response.get("result").toString(), "success");
-
- listenerCreated = new CountDownLatch(1);
- listenerEvents.clear();
-
- JettySolrRunner newNode = cluster.startJettySolrRunner();
- boolean await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
- assertTrue("The trigger did not fire at all", await);
- assertTrue(triggerFired.get());
- // wait for listener to capture the SUCCEEDED stage
- Thread.sleep(1000);
-
- List<CapturedEvent> capturedEvents = listenerEvents.get("bar");
- // we may get a few IGNORED events if other tests caused events within cooldown period
- assertTrue(capturedEvents.toString(), capturedEvents.size() > 0);
- long prevTimestamp = capturedEvents.get(capturedEvents.size() - 1).timestamp;
-
- // reset the trigger and captured events
- listenerEvents.clear();
- triggerFiredLatch = new CountDownLatch(1);
- triggerFired.compareAndSet(true, false);
-
- JettySolrRunner newNode2 = cluster.startJettySolrRunner();
- await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
- assertTrue("The trigger did not fire at all", await);
- // wait for listener to capture the SUCCEEDED stage
- Thread.sleep(2000);
-
- // there must be at least one IGNORED event due to cooldown, and one SUCCEEDED event
- capturedEvents = listenerEvents.get("bar");
- assertEquals(capturedEvents.toString(),1, capturedEvents.size());
- CapturedEvent ev = capturedEvents.get(0);
- assertEquals(ev.toString(), TriggerEventProcessorStage.SUCCEEDED, ev.stage);
- // the difference between timestamps of the first SUCCEEDED and the last SUCCEEDED
- // must be larger than cooldown period
- assertTrue("timestamp delta is less than default cooldown period", ev.timestamp - prevTimestamp > TimeUnit.SECONDS.toNanos(ScheduledTriggers.DEFAULT_COOLDOWN_PERIOD_SECONDS));
- prevTimestamp = ev.timestamp;
-
- // this also resets the cooldown period
- long modifiedCooldownPeriodSeconds = 7;
- String setPropertiesCommand = "{\n" +
- "\t\"set-properties\" : {\n" +
- "\t\t\"" + AutoScalingParams.TRIGGER_COOLDOWN_PERIOD_SECONDS + "\" : " + modifiedCooldownPeriodSeconds + "\n" +
- "\t}\n" +
- "}";
- solrClient.request(createAutoScalingRequest(SolrRequest.METHOD.POST, setPropertiesCommand));
- req = createAutoScalingRequest(SolrRequest.METHOD.GET, null);
- response = solrClient.request(req);
-
- // reset the trigger and captured events
- listenerEvents.clear();
- triggerFiredLatch = new CountDownLatch(1);
- triggerFired.compareAndSet(true, false);
-
- JettySolrRunner newNode3 = cluster.startJettySolrRunner();
- await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
- assertTrue("The trigger did not fire at all", await);
- triggerFiredLatch = new CountDownLatch(1);
- triggerFired.compareAndSet(true, false);
- // add another node
- JettySolrRunner newNode4 = cluster.startJettySolrRunner();
- await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
- assertTrue("The trigger did not fire at all", await);
- // wait for listener to capture the SUCCEEDED stage
- Thread.sleep(2000);
-
- // there must be two SUCCEEDED (due to newNode3 and newNode4) and maybe some ignored events
- capturedEvents = listenerEvents.get("bar");
- assertTrue(capturedEvents.toString(), capturedEvents.size() >= 2);
- // first event should be SUCCEEDED
- ev = capturedEvents.get(0);
- assertEquals(ev.toString(), TriggerEventProcessorStage.SUCCEEDED, ev.stage);
-
- ev = capturedEvents.get(capturedEvents.size() - 1);
- assertEquals(ev.toString(), TriggerEventProcessorStage.SUCCEEDED, ev.stage);
- // the difference between timestamps of the first SUCCEEDED and the last SUCCEEDED
- // must be larger than the modified cooldown period
- assertTrue("timestamp delta is less than default cooldown period", ev.timestamp - prevTimestamp > TimeUnit.SECONDS.toNanos(modifiedCooldownPeriodSeconds));
- }
-
- public void testSetProperties() throws Exception {
- JettySolrRunner runner = cluster.getJettySolrRunner(0);
- SolrResourceLoader resourceLoader = runner.getCoreContainer().getResourceLoader();
- SolrCloudManager solrCloudManager = runner.getCoreContainer().getZkController().getSolrCloudManager();
- AtomicLong diff = new AtomicLong(0);
- triggerFiredLatch = new CountDownLatch(2); // have the trigger run twice to capture time difference
- try (ScheduledTriggers scheduledTriggers = new ScheduledTriggers(resourceLoader, solrCloudManager)) {
- AutoScalingConfig config = new AutoScalingConfig(Collections.emptyMap());
- scheduledTriggers.setAutoScalingConfig(config);
- scheduledTriggers.add(new TriggerBase(TriggerEventType.NODELOST, "x", Collections.emptyMap(), resourceLoader, solrCloudManager) {
- @Override
- protected Map<String, Object> getState() {
- return Collections.singletonMap("x","y");
- }
-
- @Override
- protected void setState(Map<String, Object> state) {
-
- }
-
- @Override
- public void restoreState(AutoScaling.Trigger old) {
-
- }
-
- @Override
- public void run() {
- if (getTriggerFiredLatch().getCount() == 0) return;
- long l = diff.get();
- diff.set(timeSource.getTimeNs() - l);
- getTriggerFiredLatch().countDown();
- }
- });
- assertTrue(getTriggerFiredLatch().await(4, TimeUnit.SECONDS));
- assertTrue(diff.get() - TimeUnit.SECONDS.toNanos(ScheduledTriggers.DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS) >= 0);
-
- // change schedule delay
- config = config.withProperties(Collections.singletonMap(AutoScalingParams.TRIGGER_SCHEDULE_DELAY_SECONDS, 4));
- scheduledTriggers.setAutoScalingConfig(config);
- triggerFiredLatch = new CountDownLatch(2);
- assertTrue("Timed out waiting for latch to fire", getTriggerFiredLatch().await(10, TimeUnit.SECONDS));
- assertTrue(diff.get() - TimeUnit.SECONDS.toNanos(4) >= 0);
-
- // reset with default properties
- scheduledTriggers.remove("x"); // remove the old trigger
- config = config.withProperties(ScheduledTriggers.DEFAULT_PROPERTIES);
- scheduledTriggers.setAutoScalingConfig(config);
-
- // test core thread count
- List<AutoScaling.Trigger> triggerList = new ArrayList<>();
- final Set<String> threadNames = Collections.synchronizedSet(new HashSet<>());
- final Set<String> triggerNames = Collections.synchronizedSet(new HashSet<>());
- triggerFiredLatch = new CountDownLatch(8);
- for (int i = 0; i < 8; i++) {
- triggerList.add(new MockTrigger(TriggerEventType.NODELOST, "x" + i, Collections.emptyMap(), resourceLoader, solrCloudManager) {
- @Override
- public void run() {
- try {
- // If core pool size is increased then new threads won't be started if existing threads
- // aren't busy with tasks. So we make this thread wait longer than necessary
- // so that the pool is forced to start threads for other triggers
- Thread.sleep(5000);
- } catch (InterruptedException e) {
- }
- if (triggerNames.add(getName())) {
- getTriggerFiredLatch().countDown();
- threadNames.add(Thread.currentThread().getName());
- }
- }
- });
- scheduledTriggers.add(triggerList.get(i));
- }
- assertTrue("Timed out waiting for latch to fire", getTriggerFiredLatch().await(20, TimeUnit.SECONDS));
- assertEquals("Expected 8 triggers but found: " + triggerNames,8, triggerNames.size());
- assertEquals("Expected " + ScheduledTriggers.DEFAULT_TRIGGER_CORE_POOL_SIZE
- + " threads but found: " + threadNames,
- ScheduledTriggers.DEFAULT_TRIGGER_CORE_POOL_SIZE, threadNames.size());
-
- // change core pool size
- config = config.withProperties(Collections.singletonMap(AutoScalingParams.TRIGGER_CORE_POOL_SIZE, 6));
- scheduledTriggers.setAutoScalingConfig(config);
- triggerFiredLatch = new CountDownLatch(8);
- threadNames.clear();
- triggerNames.clear();
- assertTrue(getTriggerFiredLatch().await(20, TimeUnit.SECONDS));
- assertEquals("Expected 8 triggers but found: " + triggerNames,8, triggerNames.size());
- assertEquals("Expected 6 threads but found: " + threadNames,6, threadNames.size());
-
- // reset
- for (int i = 0; i < 8; i++) {
- scheduledTriggers.remove(triggerList.get(i).getName());
- }
- }
- }
-
- public static class MockTrigger extends TriggerBase {
-
- public MockTrigger(TriggerEventType eventType, String name, Map<String, Object> properties, SolrResourceLoader loader, SolrCloudManager cloudManager) {
- super(eventType, name, properties, loader, cloudManager);
- }
-
- @Override
- protected Map<String, Object> getState() {
- return Collections.emptyMap();
- }
-
- @Override
- protected void setState(Map<String, Object> state) {
-
- }
-
- @Override
- public void restoreState(AutoScaling.Trigger old) {
-
- }
-
- @Override
- public void run() {
-
- }
- }
-
- public static class TestSearchRateAction extends TriggerActionBase {
-
- @Override
- public void process(TriggerEvent event, ActionContext context) throws Exception {
- try {
- events.add(event);
- long currentTimeNanos = timeSource.getTimeNs();
- long eventTimeNanos = event.getEventTime();
- long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
- if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
- fail(event.source + " was fired before the configured waitFor period");
- }
- getTriggerFiredLatch().countDown();
- } catch (Throwable t) {
- log.debug("--throwable", t);
- throw t;
- }
- }
- }
-
- @Test
- @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028")
- public void testSearchRate() throws Exception {
- // start a few more jetty-s
- for (int i = 0; i < 3; i++) {
- cluster.startJettySolrRunner();
- }
- CloudSolrClient solrClient = cluster.getSolrClient();
- String COLL1 = "collection1";
- CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(COLL1,
- "conf", 1, 2);
- create.process(solrClient);
- String setTriggerCommand = "{" +
- "'set-trigger' : {" +
- "'name' : 'search_rate_trigger'," +
- "'event' : 'searchRate'," +
- "'waitFor' : '" + waitForSeconds + "s'," +
- "'enabled' : true," +
- "'rate' : 1.0," +
- "'actions' : [" +
- "{'name':'compute','class':'" + ComputePlanAction.class.getName() + "'}," +
- "{'name':'execute','class':'" + ExecutePlanAction.class.getName() + "'}," +
- "{'name':'test','class':'" + TestSearchRateAction.class.getName() + "'}" +
- "]" +
- "}}";
- SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
- NamedList<Object> response = solrClient.request(req);
- assertEquals(response.get("result").toString(), "success");
-
- String setListenerCommand1 = "{" +
- "'set-listener' : " +
- "{" +
- "'name' : 'srt'," +
- "'trigger' : 'search_rate_trigger'," +
- "'stage' : ['FAILED','SUCCEEDED']," +
- "'afterAction': ['compute', 'execute', 'test']," +
- "'class' : '" + TestTriggerListener.class.getName() + "'" +
- "}" +
- "}";
- req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand1);
- response = solrClient.request(req);
- assertEquals(response.get("result").toString(), "success");
- SolrParams query = params(CommonParams.Q, "*:*");
- for (int i = 0; i < 500; i++) {
- solrClient.query(COLL1, query);
- }
- boolean await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
- assertTrue("The trigger did not fire at all", await);
- // wait for listener to capture the SUCCEEDED stage
- Thread.sleep(5000);
- List<CapturedEvent> events = listenerEvents.get("srt");
- assertEquals(listenerEvents.toString(), 4, events.size());
- assertEquals("AFTER_ACTION", events.get(0).stage.toString());
- assertEquals("compute", events.get(0).actionName);
- assertEquals("AFTER_ACTION", events.get(1).stage.toString());
- assertEquals("execute", events.get(1).actionName);
- assertEquals("AFTER_ACTION", events.get(2).stage.toString());
- assertEquals("test", events.get(2).actionName);
- assertEquals("SUCCEEDED", events.get(3).stage.toString());
- assertNull(events.get(3).actionName);
-
- CapturedEvent ev = events.get(0);
- long now = timeSource.getTimeNs();
- // verify waitFor
- assertTrue(TimeUnit.SECONDS.convert(waitForSeconds, TimeUnit.NANOSECONDS) - WAIT_FOR_DELTA_NANOS <= now - ev.event.getEventTime());
- Map<String, Double> nodeRates = (Map<String, Double>)ev.event.getProperties().get("node");
- assertNotNull("nodeRates", nodeRates);
- assertTrue(nodeRates.toString(), nodeRates.size() > 0);
- AtomicDouble totalNodeRate = new AtomicDouble();
- nodeRates.forEach((n, r) -> totalNodeRate.addAndGet(r));
- List<ReplicaInfo> replicaRates = (List<ReplicaInfo>)ev.event.getProperties().get("replica");
- assertNotNull("replicaRates", replicaRates);
- assertTrue(replicaRates.toString(), replicaRates.size() > 0);
- AtomicDouble totalReplicaRate = new AtomicDouble();
- replicaRates.forEach(r -> {
- assertTrue(r.toString(), r.getVariable("rate") != null);
- totalReplicaRate.addAndGet((Double)r.getVariable("rate"));
- });
- Map<String, Object> shardRates = (Map<String, Object>)ev.event.getProperties().get("shard");
- assertNotNull("shardRates", shardRates);
- assertEquals(shardRates.toString(), 1, shardRates.size());
- shardRates = (Map<String, Object>)shardRates.get(COLL1);
- assertNotNull("shardRates", shardRates);
- assertEquals(shardRates.toString(), 1, shardRates.size());
- AtomicDouble totalShardRate = new AtomicDouble();
- shardRates.forEach((s, r) -> totalShardRate.addAndGet((Double)r));
- Map<String, Double> collectionRates = (Map<String, Double>)ev.event.getProperties().get("collection");
- assertNotNull("collectionRates", collectionRates);
- assertEquals(collectionRates.toString(), 1, collectionRates.size());
- Double collectionRate = collectionRates.get(COLL1);
- assertNotNull(collectionRate);
- assertTrue(collectionRate > 5.0);
- assertEquals(collectionRate, totalNodeRate.get(), 5.0);
- assertEquals(collectionRate, totalShardRate.get(), 5.0);
- assertEquals(collectionRate, totalReplicaRate.get(), 5.0);
-
- // check operations
- List<Map<String, Object>> ops = (List<Map<String, Object>>)ev.context.get("properties.operations");
- assertNotNull(ops);
- assertTrue(ops.size() > 1);
- for (Map<String, Object> m : ops) {
- assertEquals("ADDREPLICA", m.get("params.action"));
- }
- }
-
- @Test
- public void testMetricTrigger() throws Exception {
- cluster.waitForAllNodes(5);
-
- String collectionName = "testMetricTrigger";
- CloudSolrClient solrClient = cluster.getSolrClient();
- CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName,
- "conf", 2, 2).setMaxShardsPerNode(2);
- create.process(solrClient);
- solrClient.setDefaultCollection(collectionName);
-
- waitForState("Timed out waiting for collection:" + collectionName + " to become active", collectionName, clusterShape(2, 2));
-
- DocCollection docCollection = solrClient.getZkStateReader().getClusterState().getCollection(collectionName);
- String shardId = "shard1";
- Replica replica = docCollection.getSlice(shardId).getReplicas().iterator().next();
- String coreName = replica.getCoreName();
- String replicaName = Utils.parseMetricsReplicaName(collectionName, coreName);
- long waitForSeconds = 2 + random().nextInt(5);
- String registry = SolrCoreMetricManager.createRegistryName(true, collectionName, shardId, replicaName, null);
- String tag = "metrics:" + registry + ":INDEX.sizeInBytes";
-
- String setTriggerCommand = "{" +
- "'set-trigger' : {" +
- "'name' : 'metric_trigger'," +
- "'event' : 'metric'," +
- "'waitFor' : '" + waitForSeconds + "s'," +
- "'enabled' : true," +
- "'metric': '" + tag + "'" +
- "'above' : 100.0," +
- "'collection': '" + collectionName + "'" +
- "'shard':'" + shardId + "'" +
- "'actions' : [" +
- "{'name':'compute','class':'" + ComputePlanAction.class.getName() + "'}," +
- "{'name':'execute','class':'" + ExecutePlanAction.class.getName() + "'}," +
- "{'name':'test','class':'" + TestSearchRateAction.class.getName() + "'}" +
- "]" +
- "}}";
- SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
- NamedList<Object> response = solrClient.request(req);
- assertEquals(response.get("result").toString(), "success");
-
- String setListenerCommand1 = "{" +
- "'set-listener' : " +
- "{" +
- "'name' : 'srt'," +
- "'trigger' : 'metric_trigger'," +
- "'stage' : ['FAILED','SUCCEEDED']," +
- "'afterAction': ['compute', 'execute', 'test']," +
- "'class' : '" + TestTriggerListener.class.getName() + "'" +
- "}" +
- "}";
- req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand1);
- response = solrClient.request(req);
- assertEquals(response.get("result").toString(), "success");
-
- // start more nodes so that we have at least 4
- for (int i = cluster.getJettySolrRunners().size(); i < 4; i++) {
- cluster.startJettySolrRunner();
- }
- cluster.waitForAllNodes(10);
-
- List<SolrInputDocument> docs = new ArrayList<>(500);
- for (int i = 0; i < 500; i++) {
- docs.add(new SolrInputDocument("id", String.valueOf(i), "x_s", "x" + i));
- }
- solrClient.add(docs);
- solrClient.commit();
-
- boolean await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
- assertTrue("The trigger did not fire at all", await);
- // wait for listener to capture the SUCCEEDED stage
- Thread.sleep(2000);
- assertEquals(listenerEvents.toString(), 4, listenerEvents.get("srt").size());
- CapturedEvent ev = listenerEvents.get("srt").get(0);
- long now = timeSource.getTimeNs();
- // verify waitFor
- assertTrue(TimeUnit.SECONDS.convert(waitForSeconds, TimeUnit.NANOSECONDS) - WAIT_FOR_DELTA_NANOS <= now - ev.event.getEventTime());
- assertEquals(collectionName, ev.event.getProperties().get("collection"));
-
- // find a new replica and create its metric name
- docCollection = solrClient.getZkStateReader().getClusterState().getCollection(collectionName);
- replica = docCollection.getSlice(shardId).getReplicas().iterator().next();
- coreName = replica.getCoreName();
- replicaName = Utils.parseMetricsReplicaName(collectionName, coreName);
- registry = SolrCoreMetricManager.createRegistryName(true, collectionName, shardId, replicaName, null);
- tag = "metrics:" + registry + ":INDEX.sizeInBytes";
-
- triggerFiredLatch = new CountDownLatch(1);
- listenerEvents.clear();
-
- setTriggerCommand = "{" +
- "'set-trigger' : {" +
- "'name' : 'metric_trigger'," +
- "'event' : 'metric'," +
- "'waitFor' : '" + waitForSeconds + "s'," +
- "'enabled' : true," +
- "'metric': '" + tag + "'" +
- "'above' : 100.0," +
- "'collection': '" + collectionName + "'" +
- "'shard':'" + shardId + "'" +
- "'preferredOperation':'addreplica'" +
- "'actions' : [" +
- "{'name':'compute','class':'" + ComputePlanAction.class.getName() + "'}," +
- "{'name':'execute','class':'" + ExecutePlanAction.class.getName() + "'}," +
- "{'name':'test','class':'" + TestSearchRateAction.class.getName() + "'}" +
- "]" +
- "}}";
- req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
- response = solrClient.request(req);
- assertEquals(response.get("result").toString(), "success");
-
- await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
- assertTrue("The trigger did not fire at all", await);
- // wait for listener to capture the SUCCEEDED stage
- Thread.sleep(2000);
- assertEquals(listenerEvents.toString(), 4, listenerEvents.get("srt").size());
- ev = listenerEvents.get("srt").get(0);
- now = timeSource.getTimeNs();
- // verify waitFor
- assertTrue(TimeUnit.SECONDS.convert(waitForSeconds, TimeUnit.NANOSECONDS) - WAIT_FOR_DELTA_NANOS <= now - ev.event.getEventTime());
- assertEquals(collectionName, ev.event.getProperties().get("collection"));
- docCollection = solrClient.getZkStateReader().getClusterState().getCollection(collectionName);
- assertEquals(5, docCollection.getReplicas().size());
- }
-
- @Test
- @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 26-Mar-2018
- public void testScheduledTrigger() throws Exception {
- CloudSolrClient solrClient = cluster.getSolrClient();
-
- // this collection will place 2 cores on 1st node and 1 core on 2nd node
- String collectionName = "testScheduledTrigger";
- CollectionAdminRequest.createCollection(collectionName, 1, 3)
- .setMaxShardsPerNode(5).process(solrClient);
- waitForState("", collectionName, clusterShape(1, 3));
-
- // create a policy which allows only 1 core per node thereby creating a violation for the above collection
- String setClusterPolicy = "{\n" +
- " \"set-cluster-policy\" : [\n" +
- " {\"cores\" : \"<2\", \"node\" : \"#EACH\"}\n" +
- " ]\n" +
- "}";
- SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setClusterPolicy);
- NamedList<Object> response = solrClient.request(req);
- assertEquals(response.get("result").toString(), "success");
-
- // start a new node which can be used to balance the cluster as per policy
- JettySolrRunner newNode = cluster.startJettySolrRunner();
- cluster.waitForAllNodes(10);
-
- String setTriggerCommand = "{" +
- "'set-trigger' : {" +
- "'name' : 'sched_trigger_integration1'," +
- "'event' : 'scheduled'," +
- "'startTime' : '" + new Date().toInstant().toString() + "'" +
- "'every' : '+3SECONDS'" +
- "'actions' : [" +
- "{'name' : 'compute','class':'" + ComputePlanAction.class.getName() + "'}," +
- "{'name' : 'execute','class':'" + ExecutePlanAction.class.getName() + "'}," +
- "{'name' : 'recorder', 'class': '" + ContextPropertiesRecorderAction.class.getName() + "'}" +
- "]}}";
- req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
- response = solrClient.request(req);
- assertEquals(response.get("result").toString(), "success");
-
- assertTrue("ScheduledTrigger did not fire within 20 seconds", triggerFiredLatch.await(20, TimeUnit.SECONDS));
- assertEquals(1, events.size());
- Map<String, Object> actionContextProps = actionContextPropertiesRef.get();
- assertNotNull(actionContextProps);
- TriggerEvent event = events.iterator().next();
- List<SolrRequest> operations = (List<SolrRequest>) actionContextProps.get("operations");
- assertNotNull(operations);
- assertEquals(1, operations.size());
- for (SolrRequest operation : operations) {
- SolrParams params = operation.getParams();
- assertEquals(newNode.getNodeName(), params.get("targetNode"));
- }
- }
-
- private static AtomicReference<Map<String, Object>> actionContextPropertiesRef = new AtomicReference<>();
-
- public static class ContextPropertiesRecorderAction extends TestEventMarkerAction {
- @Override
- public void process(TriggerEvent event, ActionContext actionContext) {
- actionContextPropertiesRef.set(actionContext.getProperties());
- super.process(event, actionContext);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8533a46c/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerSetPropertiesIntegrationTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerSetPropertiesIntegrationTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerSetPropertiesIntegrationTest.java
new file mode 100644
index 0000000..47ac227
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerSetPropertiesIntegrationTest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.autoscaling;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.cloud.SolrCloudManager;
+import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
+import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.params.AutoScalingParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.core.SolrResourceLoader;
+import org.apache.solr.util.LogLevel;
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.cloud.autoscaling.AutoScalingHandlerTest.createAutoScalingRequest;
+import static org.apache.solr.cloud.autoscaling.TriggerIntegrationTest.timeSource;
+
+@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG;org.apache.solr.client.solrj.cloud.autoscaling=DEBUG")
+public class TriggerSetPropertiesIntegrationTest extends SolrCloudTestCase {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private static CountDownLatch triggerFiredLatch = new CountDownLatch(1);
+
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ configureCluster(5)
+ .addConfig("conf", configset("cloud-minimal"))
+ .configure();
+ // disable .scheduled_maintenance
+ String suspendTriggerCommand = "{" +
+ "'suspend-trigger' : {'name' : '.scheduled_maintenance'}" +
+ "}";
+ SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, suspendTriggerCommand);
+ SolrClient solrClient = cluster.getSolrClient();
+ NamedList<Object> response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+ }
+
+ private static CountDownLatch getTriggerFiredLatch() {
+ return triggerFiredLatch;
+ }
+
+ public void testSetProperties() throws Exception {
+ JettySolrRunner runner = cluster.getJettySolrRunner(0);
+ SolrResourceLoader resourceLoader = runner.getCoreContainer().getResourceLoader();
+ SolrCloudManager solrCloudManager = runner.getCoreContainer().getZkController().getSolrCloudManager();
+ AtomicLong diff = new AtomicLong(0);
+ triggerFiredLatch = new CountDownLatch(2); // have the trigger run twice to capture time difference
+ try (ScheduledTriggers scheduledTriggers = new ScheduledTriggers(resourceLoader, solrCloudManager)) {
+ AutoScalingConfig config = new AutoScalingConfig(Collections.emptyMap());
+ scheduledTriggers.setAutoScalingConfig(config);
+ scheduledTriggers.add(new TriggerBase(TriggerEventType.NODELOST, "x", Collections.emptyMap(), resourceLoader, solrCloudManager) {
+ @Override
+ protected Map<String, Object> getState() {
+ return Collections.singletonMap("x", "y");
+ }
+
+ @Override
+ protected void setState(Map<String, Object> state) {
+
+ }
+
+ @Override
+ public void restoreState(AutoScaling.Trigger old) {
+
+ }
+
+ @Override
+ public void run() {
+ if (getTriggerFiredLatch().getCount() == 0) return;
+ long l = diff.get();
+ diff.set(timeSource.getTimeNs() - l);
+ getTriggerFiredLatch().countDown();
+ }
+ });
+ assertTrue(getTriggerFiredLatch().await(4, TimeUnit.SECONDS));
+ assertTrue(diff.get() - TimeUnit.SECONDS.toNanos(ScheduledTriggers.DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS) >= 0);
+
+ // change schedule delay
+ config = config.withProperties(Collections.singletonMap(AutoScalingParams.TRIGGER_SCHEDULE_DELAY_SECONDS, 4));
+ scheduledTriggers.setAutoScalingConfig(config);
+ triggerFiredLatch = new CountDownLatch(2);
+ assertTrue("Timed out waiting for latch to fire", getTriggerFiredLatch().await(10, TimeUnit.SECONDS));
+ assertTrue(diff.get() - TimeUnit.SECONDS.toNanos(4) >= 0);
+
+ // reset with default properties
+ scheduledTriggers.remove("x"); // remove the old trigger
+ config = config.withProperties(ScheduledTriggers.DEFAULT_PROPERTIES);
+ scheduledTriggers.setAutoScalingConfig(config);
+
+ // test core thread count
+ List<AutoScaling.Trigger> triggerList = new ArrayList<>();
+ final Set<String> threadNames = Collections.synchronizedSet(new HashSet<>());
+ final Set<String> triggerNames = Collections.synchronizedSet(new HashSet<>());
+ triggerFiredLatch = new CountDownLatch(8);
+ for (int i = 0; i < 8; i++) {
+ triggerList.add(new MockTrigger(TriggerEventType.NODELOST, "x" + i, Collections.emptyMap(), resourceLoader, solrCloudManager) {
+ @Override
+ public void run() {
+ try {
+ // If core pool size is increased then new threads won't be started if existing threads
+ // aren't busy with tasks. So we make this thread wait longer than necessary
+ // so that the pool is forced to start threads for other triggers
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ }
+ if (triggerNames.add(getName())) {
+ getTriggerFiredLatch().countDown();
+ threadNames.add(Thread.currentThread().getName());
+ }
+ }
+ });
+ scheduledTriggers.add(triggerList.get(i));
+ }
+ assertTrue("Timed out waiting for latch to fire", getTriggerFiredLatch().await(20, TimeUnit.SECONDS));
+ assertEquals("Expected 8 triggers but found: " + triggerNames, 8, triggerNames.size());
+ assertEquals("Expected " + ScheduledTriggers.DEFAULT_TRIGGER_CORE_POOL_SIZE
+ + " threads but found: " + threadNames,
+ ScheduledTriggers.DEFAULT_TRIGGER_CORE_POOL_SIZE, threadNames.size());
+
+ // change core pool size
+ config = config.withProperties(Collections.singletonMap(AutoScalingParams.TRIGGER_CORE_POOL_SIZE, 6));
+ scheduledTriggers.setAutoScalingConfig(config);
+ triggerFiredLatch = new CountDownLatch(8);
+ threadNames.clear();
+ triggerNames.clear();
+ assertTrue(getTriggerFiredLatch().await(20, TimeUnit.SECONDS));
+ assertEquals("Expected 8 triggers but found: " + triggerNames, 8, triggerNames.size());
+ assertEquals("Expected 6 threads but found: " + threadNames, 6, threadNames.size());
+
+ // reset
+ for (int i = 0; i < 8; i++) {
+ scheduledTriggers.remove(triggerList.get(i).getName());
+ }
+ }
+ }
+
+ public static class MockTrigger extends TriggerBase {
+
+ public MockTrigger(TriggerEventType eventType, String name, Map<String, Object> properties, SolrResourceLoader loader, SolrCloudManager cloudManager) {
+ super(eventType, name, properties, loader, cloudManager);
+ }
+
+ @Override
+ protected Map<String, Object> getState() {
+ return Collections.emptyMap();
+ }
+
+ @Override
+ protected void setState(Map<String, Object> state) {
+
+ }
+
+ @Override
+ public void restoreState(AutoScaling.Trigger old) {
+
+ }
+
+ @Override
+ public void run() {
+
+ }
+ }
+}
[5/6] lucene-solr:branch_7x: SOLR-12152: Extracted
TriggerIntegrationTest.testEventFromRestoredState into its own test class
Posted by sh...@apache.org.
SOLR-12152: Extracted TriggerIntegrationTest.testEventFromRestoredState into its own test class
(cherry picked from commit 0e5374e)
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/85f6da09
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/85f6da09
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/85f6da09
Branch: refs/heads/branch_7x
Commit: 85f6da096ecd525591326dba611a5527b9bcd317
Parents: 5dde890
Author: Shalin Shekhar Mangar <sh...@apache.org>
Authored: Fri Mar 30 12:41:18 2018 +0530
Committer: Shalin Shekhar Mangar <sh...@apache.org>
Committed: Fri Mar 30 19:33:08 2018 +0530
----------------------------------------------------------------------
.../autoscaling/RestoreTriggerStateTest.java | 169 +++++++++++++++++++
.../autoscaling/TriggerIntegrationTest.java | 53 ------
2 files changed, 169 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/85f6da09/solr/core/src/test/org/apache/solr/cloud/autoscaling/RestoreTriggerStateTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/RestoreTriggerStateTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/RestoreTriggerStateTest.java
new file mode 100644
index 0000000..a3417cf
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/RestoreTriggerStateTest.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.autoscaling;
+
+import java.lang.invoke.MethodHandles;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.util.LogLevel;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.cloud.autoscaling.AutoScalingHandlerTest.createAutoScalingRequest;
+import static org.apache.solr.cloud.autoscaling.TriggerIntegrationTest.WAIT_FOR_DELTA_NANOS;
+import static org.apache.solr.cloud.autoscaling.TriggerIntegrationTest.timeSource;
+
+/**
+ * Integration test to ensure that triggers can restore state from ZooKeeper after overseer restart
+ * so that events detected before restart are not lost.
+ *
+ * Added in SOLR-10515
+ */
+@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG;org.apache.solr.client.solrj.cloud.autoscaling=DEBUG")
+public class RestoreTriggerStateTest extends SolrCloudTestCase {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private static CountDownLatch actionInitCalled;
+ private static CountDownLatch triggerFiredLatch;
+ private static AtomicBoolean triggerFired;
+ private static CountDownLatch actionConstructorCalled;
+ private static Set<TriggerEvent> events = ConcurrentHashMap.newKeySet();
+ private static int waitForSeconds = 1;
+
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ configureCluster(2)
+ .addConfig("conf", configset("cloud-minimal"))
+ .configure();
+ // disable .scheduled_maintenance
+ String suspendTriggerCommand = "{" +
+ "'suspend-trigger' : {'name' : '.scheduled_maintenance'}" +
+ "}";
+ SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, suspendTriggerCommand);
+ SolrClient solrClient = cluster.getSolrClient();
+ NamedList<Object> response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+ actionInitCalled = new CountDownLatch(1);
+ triggerFiredLatch = new CountDownLatch(1);
+ actionConstructorCalled = new CountDownLatch(1);
+ triggerFired = new AtomicBoolean();
+ }
+
+ @Test
+ public void testEventFromRestoredState() throws Exception {
+ CloudSolrClient solrClient = cluster.getSolrClient();
+ String setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'node_added_triggerEFRS'," +
+ "'event' : 'nodeAdded'," +
+ "'waitFor' : '10s'," +
+ "'enabled' : true," +
+ "'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
+ "}}";
+ SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+ NamedList<Object> response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ if (!actionInitCalled.await(10, TimeUnit.SECONDS)) {
+ fail("The TriggerAction should have been created by now");
+ }
+
+ NamedList<Object> overSeerStatus = cluster.getSolrClient().request(CollectionAdminRequest.getOverseerStatus());
+ String overseerLeader = (String) overSeerStatus.get("leader");
+ int overseerLeaderIndex = 0;
+ for (int i = 0; i < cluster.getJettySolrRunners().size(); i++) {
+ JettySolrRunner jetty = cluster.getJettySolrRunner(i);
+ if (jetty.getNodeName().equals(overseerLeader)) {
+ overseerLeaderIndex = i;
+ break;
+ }
+ }
+
+ events.clear();
+
+ JettySolrRunner newNode = cluster.startJettySolrRunner();
+ boolean await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
+ assertTrue("The trigger did not fire at all", await);
+ assertTrue(triggerFired.get());
+ // reset
+ triggerFired.set(false);
+ triggerFiredLatch = new CountDownLatch(1);
+ NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = (NodeAddedTrigger.NodeAddedEvent) events.iterator().next();
+ assertNotNull(nodeAddedEvent);
+ List<String> nodeNames = (List<String>) nodeAddedEvent.getProperty(TriggerEvent.NODE_NAMES);
+ assertTrue(nodeNames.contains(newNode.getNodeName()));
+ // add a second node - state of the trigger will change but it won't fire for waitFor sec.
+ JettySolrRunner newNode2 = cluster.startJettySolrRunner();
+ Thread.sleep(10000);
+ // kill overseer leader
+ cluster.stopJettySolrRunner(overseerLeaderIndex);
+ await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
+ assertTrue("The trigger did not fire at all", await);
+ assertTrue(triggerFired.get());
+ }
+
+ public static class TestTriggerAction extends TriggerActionBase {
+
+ public TestTriggerAction() {
+ actionConstructorCalled.countDown();
+ }
+
+ @Override
+ public void process(TriggerEvent event, ActionContext actionContext) {
+ try {
+ if (triggerFired.compareAndSet(false, true)) {
+ events.add(event);
+ long currentTimeNanos = timeSource.getTimeNs();
+ long eventTimeNanos = event.getEventTime();
+ long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
+ if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
+ fail(event.source + " was fired before the configured waitFor period");
+ }
+ triggerFiredLatch.countDown();
+ } else {
+ fail(event.source + " was fired more than once!");
+ }
+ } catch (Throwable t) {
+ log.debug("--throwable", t);
+ throw t;
+ }
+ }
+
+ @Override
+ public void init(Map<String, String> args) {
+ log.info("TestTriggerAction init");
+ actionInitCalled.countDown();
+ super.init(args);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/85f6da09/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
index f536633..5dfe34c 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
@@ -496,59 +496,6 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
assertTrue(triggerFired.get());
}
- @Test
- public void testEventFromRestoredState() throws Exception {
- CloudSolrClient solrClient = cluster.getSolrClient();
- String setTriggerCommand = "{" +
- "'set-trigger' : {" +
- "'name' : 'node_added_triggerEFRS'," +
- "'event' : 'nodeAdded'," +
- "'waitFor' : '10s'," +
- "'enabled' : true," +
- "'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
- "}}";
- SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
- NamedList<Object> response = solrClient.request(req);
- assertEquals(response.get("result").toString(), "success");
-
- if (!actionInitCalled.await(10, TimeUnit.SECONDS)) {
- fail("The TriggerAction should have been created by now");
- }
-
- NamedList<Object> overSeerStatus = cluster.getSolrClient().request(CollectionAdminRequest.getOverseerStatus());
- String overseerLeader = (String) overSeerStatus.get("leader");
- int overseerLeaderIndex = 0;
- for (int i = 0; i < cluster.getJettySolrRunners().size(); i++) {
- JettySolrRunner jetty = cluster.getJettySolrRunner(i);
- if (jetty.getNodeName().equals(overseerLeader)) {
- overseerLeaderIndex = i;
- break;
- }
- }
-
- events.clear();
-
- JettySolrRunner newNode = cluster.startJettySolrRunner();
- boolean await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
- assertTrue("The trigger did not fire at all", await);
- assertTrue(triggerFired.get());
- // reset
- triggerFired.set(false);
- triggerFiredLatch = new CountDownLatch(1);
- NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = (NodeAddedTrigger.NodeAddedEvent) events.iterator().next();
- assertNotNull(nodeAddedEvent);
- List<String> nodeNames = (List<String>)nodeAddedEvent.getProperty(TriggerEvent.NODE_NAMES);
- assertTrue(nodeNames.contains(newNode.getNodeName()));
- // add a second node - state of the trigger will change but it won't fire for waitFor sec.
- JettySolrRunner newNode2 = cluster.startJettySolrRunner();
- Thread.sleep(10000);
- // kill overseer leader
- cluster.stopJettySolrRunner(overseerLeaderIndex);
- await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
- assertTrue("The trigger did not fire at all", await);
- assertTrue(triggerFired.get());
- }
-
static Map<String, List<CapturedEvent>> listenerEvents = new HashMap<>();
static CountDownLatch listenerCreated = new CountDownLatch(1);
static boolean failDummyAction = false;
[4/6] lucene-solr:branch_7x: SOLR-12152: Fix compilation error due to
missing import
Posted by sh...@apache.org.
SOLR-12152: Fix compilation error due to missing import
(cherry picked from commit 1aafc90)
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/5dde8901
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/5dde8901
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/5dde8901
Branch: refs/heads/branch_7x
Commit: 5dde89014ac2d32fdec193636665abf38351312a
Parents: ee9320f
Author: Shalin Shekhar Mangar <sh...@apache.org>
Authored: Fri Mar 30 12:23:06 2018 +0530
Committer: Shalin Shekhar Mangar <sh...@apache.org>
Committed: Fri Mar 30 19:32:59 2018 +0530
----------------------------------------------------------------------
.../solr/cloud/autoscaling/ScheduledTriggerIntegrationTest.java | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/5dde8901/solr/core/src/test/org/apache/solr/cloud/autoscaling/ScheduledTriggerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/ScheduledTriggerIntegrationTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/ScheduledTriggerIntegrationTest.java
index 24e7420..24a8c6c 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/ScheduledTriggerIntegrationTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/ScheduledTriggerIntegrationTest.java
@@ -27,6 +27,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import org.apache.lucene.util.LuceneTestCase;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
@@ -47,7 +48,7 @@ import static org.apache.solr.cloud.autoscaling.AutoScalingHandlerTest.createAut
* Integration test for {@link ScheduledTrigger}
*/
@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG;org.apache.solr.client.solrj.cloud.autoscaling=DEBUG")
-@BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 26-Mar-2018
+@LuceneTestCase.BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 26-Mar-2018
public class ScheduledTriggerIntegrationTest extends SolrCloudTestCase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
[6/6] lucene-solr:branch_7x: SOLR-12133: Fix failures in
TriggerIntegrationTest.testEventQueue due to race conditions
Posted by sh...@apache.org.
SOLR-12133: Fix failures in TriggerIntegrationTest.testEventQueue due to race conditions
(cherry picked from commit 83cca5c)
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/83f77bcb
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/83f77bcb
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/83f77bcb
Branch: refs/heads/branch_7x
Commit: 83f77bcb4917a5b9c7cc60cffea29ab8aa0626a8
Parents: 85f6da0
Author: Shalin Shekhar Mangar <sh...@apache.org>
Authored: Fri Mar 30 16:42:35 2018 +0530
Committer: Shalin Shekhar Mangar <sh...@apache.org>
Committed: Fri Mar 30 19:33:27 2018 +0530
----------------------------------------------------------------------
.../cloud/autoscaling/ScheduledTriggers.java | 13 +++++++++
.../autoscaling/TriggerIntegrationTest.java | 28 +++++++++++---------
.../apache/solr/common/util/ExecutorUtil.java | 4 +++
3 files changed, 32 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/83f77bcb/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
index 7cd2d71..5108fb5 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
@@ -66,6 +66,7 @@ import static org.apache.solr.common.params.AutoScalingParams.ACTION_THROTTLE_PE
import static org.apache.solr.common.params.AutoScalingParams.TRIGGER_COOLDOWN_PERIOD_SECONDS;
import static org.apache.solr.common.params.AutoScalingParams.TRIGGER_CORE_POOL_SIZE;
import static org.apache.solr.common.params.AutoScalingParams.TRIGGER_SCHEDULE_DELAY_SECONDS;
+import static org.apache.solr.common.util.ExecutorUtil.awaitTermination;
/**
* Responsible for scheduling active triggers, starting and stopping them and
@@ -498,9 +499,21 @@ public class ScheduledTriggers implements Closeable {
}
// shutdown and interrupt all running tasks because there's no longer any
// guarantee about cluster state
+ log.debug("Shutting down scheduled thread pool executor now");
scheduledThreadPoolExecutor.shutdownNow();
+
+ log.debug("Shutting down action executor now");
actionExecutor.shutdownNow();
+
listeners.close();
+
+ log.debug("Awaiting termination for action executor");
+ awaitTermination(actionExecutor);
+
+ log.debug("Awaiting termination for scheduled thread pool executor");
+ awaitTermination(scheduledThreadPoolExecutor);
+
+ log.debug("ScheduledTriggers closed completely");
}
private class TriggerWrapper implements Runnable, Closeable {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/83f77bcb/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
index 5dfe34c..2902c48 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
@@ -64,15 +64,16 @@ import static org.apache.solr.common.cloud.ZkStateReader.SOLR_AUTOSCALING_CONF_P
public class TriggerIntegrationTest extends SolrCloudTestCase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- private static CountDownLatch actionConstructorCalled;
- private static CountDownLatch actionInitCalled;
- private static CountDownLatch triggerFiredLatch;
- private static int waitForSeconds = 1;
- private static CountDownLatch actionStarted;
- private static CountDownLatch actionInterrupted;
- private static CountDownLatch actionCompleted;
+ private static volatile CountDownLatch actionConstructorCalled;
+ private static volatile CountDownLatch actionInitCalled;
+ private static volatile CountDownLatch triggerFiredLatch;
+ private static volatile int waitForSeconds = 1;
+ private static volatile CountDownLatch actionStarted;
+ private static volatile CountDownLatch actionInterrupted;
+ private static volatile CountDownLatch actionCompleted;
private static AtomicBoolean triggerFired;
private static Set<TriggerEvent> events = ConcurrentHashMap.newKeySet();
+ public static volatile long eventQueueActionWait = 5000;
private static SolrCloudManager cloudManager;
// use the same time source as triggers use
@@ -166,6 +167,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
events.clear();
listenerEvents.clear();
lastActionExecutedAt.set(0);
+ eventQueueActionWait = 5000;
while (cluster.getJettySolrRunners().size() < 2) {
// perhaps a test stopped a node but didn't start it back
// lets start a node
@@ -415,14 +417,17 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
public void process(TriggerEvent event, ActionContext actionContext) {
log.info("-- event: " + event);
events.add(event);
+ long eventQueueActionWaitCopy = eventQueueActionWait;
getActionStarted().countDown();
try {
- Thread.sleep(eventQueueActionWait);
+ log.info("-- Going to sleep for {} ms", eventQueueActionWaitCopy);
+ Thread.sleep(eventQueueActionWaitCopy);
+ log.info("-- Woke up after sleeping for {} ms", eventQueueActionWaitCopy);
triggerFired.compareAndSet(false, true);
getActionCompleted().countDown();
} catch (InterruptedException e) {
+ log.info("-- Interrupted");
getActionInterrupted().countDown();
- return;
}
}
@@ -434,10 +439,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
}
}
- public static long eventQueueActionWait = 5000;
-
@Test
- @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028")
public void testEventQueue() throws Exception {
waitForSeconds = 1;
CloudSolrClient solrClient = cluster.getSolrClient();
@@ -471,6 +473,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
JettySolrRunner newNode = cluster.startJettySolrRunner();
boolean await = actionStarted.await(60, TimeUnit.SECONDS);
assertTrue("action did not start", await);
+ eventQueueActionWait = 1;
// event should be there
NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = (NodeAddedTrigger.NodeAddedEvent) events.iterator().next();
assertNotNull(nodeAddedEvent);
@@ -478,7 +481,6 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
assertFalse(triggerFired.get());
events.clear();
actionStarted = new CountDownLatch(1);
- eventQueueActionWait = 1;
// kill overseer leader
cluster.stopJettySolrRunner(overseerLeaderIndex);
Thread.sleep(5000);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/83f77bcb/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java b/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java
index a045726..7458016 100644
--- a/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java
+++ b/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java
@@ -73,6 +73,10 @@ public class ExecutorUtil {
public static void shutdownAndAwaitTermination(ExecutorService pool) {
pool.shutdown(); // Disable new tasks from being submitted
+ awaitTermination(pool);
+ }
+
+ public static void awaitTermination(ExecutorService pool) {
boolean shutdown = false;
while (!shutdown) {
try {
[3/6] lucene-solr:branch_7x: SOLR-12152: Fix node count to 2 for
TriggerSetPropertiesIntegrationTest
Posted by sh...@apache.org.
SOLR-12152: Fix node count to 2 for TriggerSetPropertiesIntegrationTest
(cherry picked from commit ac8cbaa)
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/ee9320fb
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/ee9320fb
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/ee9320fb
Branch: refs/heads/branch_7x
Commit: ee9320fb146b92e5955974f716b82867ab50a977
Parents: 8533a46
Author: Shalin Shekhar Mangar <sh...@apache.org>
Authored: Fri Mar 30 12:19:13 2018 +0530
Committer: Shalin Shekhar Mangar <sh...@apache.org>
Committed: Fri Mar 30 19:32:51 2018 +0530
----------------------------------------------------------------------
.../cloud/autoscaling/TriggerSetPropertiesIntegrationTest.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/ee9320fb/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerSetPropertiesIntegrationTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerSetPropertiesIntegrationTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerSetPropertiesIntegrationTest.java
index 47ac227..5f4243f 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerSetPropertiesIntegrationTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerSetPropertiesIntegrationTest.java
@@ -54,7 +54,7 @@ public class TriggerSetPropertiesIntegrationTest extends SolrCloudTestCase {
@BeforeClass
public static void setupCluster() throws Exception {
- configureCluster(5)
+ configureCluster(2)
.addConfig("conf", configset("cloud-minimal"))
.configure();
// disable .scheduled_maintenance
[2/6] lucene-solr:branch_7x: SOLR-12152: Split up
TriggerIntegrationTest into multiple tests to isolate and increase
reliability
Posted by sh...@apache.org.
SOLR-12152: Split up TriggerIntegrationTest into multiple tests to isolate and increase reliability
(cherry picked from commit ed9e5eb)
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/8533a46c
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/8533a46c
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/8533a46c
Branch: refs/heads/branch_7x
Commit: 8533a46cc4588e20e55b4e9e5f64699cd1bb0273
Parents: 12106f0
Author: Shalin Shekhar Mangar <sh...@apache.org>
Authored: Fri Mar 30 11:08:56 2018 +0530
Committer: Shalin Shekhar Mangar <sh...@apache.org>
Committed: Fri Mar 30 19:32:43 2018 +0530
----------------------------------------------------------------------
solr/CHANGES.txt | 2 +
.../MetricTriggerIntegrationTest.java | 242 +++++
.../NodeAddedTriggerIntegrationTest.java | 300 ++++++
.../NodeLostTriggerIntegrationTest.java | 322 ++++++
.../NodeMarkersRegistrationTest.java | 269 +++++
.../ScheduledTriggerIntegrationTest.java | 141 +++
.../SearchRateTriggerIntegrationTest.java | 217 ++++
.../TriggerCooldownIntegrationTest.java | 238 +++++
.../autoscaling/TriggerIntegrationTest.java | 1006 +-----------------
.../TriggerSetPropertiesIntegrationTest.java | 195 ++++
10 files changed, 1929 insertions(+), 1003 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8533a46c/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 17f30b1..8696913 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -108,6 +108,8 @@ Other Changes
* SOLR-12118: Solr Ref-Guide can now use some ivy version props directly as attributes in content (hossman)
+* SOLR-12152: Split up TriggerIntegrationTest into multiple tests to isolate and increase reliability. (shalin)
+
================== 7.3.0 ==================
Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release.
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8533a46c/solr/core/src/test/org/apache/solr/cloud/autoscaling/MetricTriggerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/MetricTriggerIntegrationTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/MetricTriggerIntegrationTest.java
new file mode 100644
index 0000000..7b6da5a
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/MetricTriggerIntegrationTest.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.autoscaling;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.cloud.SolrCloudManager;
+import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
+import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventProcessorStage;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.metrics.SolrCoreMetricManager;
+import org.apache.solr.util.LogLevel;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.cloud.autoscaling.AutoScalingHandlerTest.createAutoScalingRequest;
+import static org.apache.solr.cloud.autoscaling.TriggerIntegrationTest.WAIT_FOR_DELTA_NANOS;
+import static org.apache.solr.cloud.autoscaling.TriggerIntegrationTest.timeSource;
+
+/**
+ * Integration test for {@link MetricTrigger}
+ */
+@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG;org.apache.solr.client.solrj.cloud.autoscaling=DEBUG")
+public class MetricTriggerIntegrationTest extends SolrCloudTestCase {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ static Map<String, List<CapturedEvent>> listenerEvents = new HashMap<>();
+ static CountDownLatch listenerCreated = new CountDownLatch(1);
+ private static CountDownLatch triggerFiredLatch;
+ private static int waitForSeconds = 1;
+ private static Set<TriggerEvent> events = ConcurrentHashMap.newKeySet();
+
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ configureCluster(2)
+ .addConfig("conf", configset("cloud-minimal"))
+ .configure();
+ // disable .scheduled_maintenance
+ String suspendTriggerCommand = "{" +
+ "'suspend-trigger' : {'name' : '.scheduled_maintenance'}" +
+ "}";
+ SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, suspendTriggerCommand);
+ SolrClient solrClient = cluster.getSolrClient();
+ NamedList<Object> response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+ triggerFiredLatch = new CountDownLatch(1);
+ }
+
+ @Test
+ public void testMetricTrigger() throws Exception {
+ cluster.waitForAllNodes(5);
+
+ String collectionName = "testMetricTrigger";
+ CloudSolrClient solrClient = cluster.getSolrClient();
+ CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName,
+ "conf", 2, 2).setMaxShardsPerNode(2);
+ create.process(solrClient);
+ solrClient.setDefaultCollection(collectionName);
+
+ waitForState("Timed out waiting for collection:" + collectionName + " to become active", collectionName, clusterShape(2, 2));
+
+ DocCollection docCollection = solrClient.getZkStateReader().getClusterState().getCollection(collectionName);
+ String shardId = "shard1";
+ Replica replica = docCollection.getSlice(shardId).getReplicas().iterator().next();
+ String coreName = replica.getCoreName();
+ String replicaName = Utils.parseMetricsReplicaName(collectionName, coreName);
+ long waitForSeconds = 2 + random().nextInt(5);
+ String registry = SolrCoreMetricManager.createRegistryName(true, collectionName, shardId, replicaName, null);
+ String tag = "metrics:" + registry + ":INDEX.sizeInBytes";
+
+ String setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'metric_trigger'," +
+ "'event' : 'metric'," +
+ "'waitFor' : '" + waitForSeconds + "s'," +
+ "'enabled' : true," +
+ "'metric': '" + tag + "'" +
+ "'above' : 100.0," +
+ "'collection': '" + collectionName + "'" +
+ "'shard':'" + shardId + "'" +
+ "'actions' : [" +
+ "{'name':'compute','class':'" + ComputePlanAction.class.getName() + "'}," +
+ "{'name':'execute','class':'" + ExecutePlanAction.class.getName() + "'}," +
+ "{'name':'test','class':'" + MetricAction.class.getName() + "'}" +
+ "]" +
+ "}}";
+ SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+ NamedList<Object> response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ String setListenerCommand1 = "{" +
+ "'set-listener' : " +
+ "{" +
+ "'name' : 'srt'," +
+ "'trigger' : 'metric_trigger'," +
+ "'stage' : ['FAILED','SUCCEEDED']," +
+ "'afterAction': ['compute', 'execute', 'test']," +
+ "'class' : '" + TestTriggerListener.class.getName() + "'" +
+ "}" +
+ "}";
+ req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand1);
+ response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ // start more nodes so that we have at least 4
+ for (int i = cluster.getJettySolrRunners().size(); i < 4; i++) {
+ cluster.startJettySolrRunner();
+ }
+ cluster.waitForAllNodes(10);
+
+ List<SolrInputDocument> docs = new ArrayList<>(500);
+ for (int i = 0; i < 500; i++) {
+ docs.add(new SolrInputDocument("id", String.valueOf(i), "x_s", "x" + i));
+ }
+ solrClient.add(docs);
+ solrClient.commit();
+
+ boolean await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
+ assertTrue("The trigger did not fire at all", await);
+ // wait for listener to capture the SUCCEEDED stage
+ Thread.sleep(2000);
+ assertEquals(listenerEvents.toString(), 4, listenerEvents.get("srt").size());
+ CapturedEvent ev = listenerEvents.get("srt").get(0);
+ long now = timeSource.getTimeNs();
+ // verify waitFor
+ assertTrue(TimeUnit.SECONDS.convert(waitForSeconds, TimeUnit.NANOSECONDS) - WAIT_FOR_DELTA_NANOS <= now - ev.event.getEventTime());
+ assertEquals(collectionName, ev.event.getProperties().get("collection"));
+
+ // find a new replica and create its metric name
+ docCollection = solrClient.getZkStateReader().getClusterState().getCollection(collectionName);
+ replica = docCollection.getSlice(shardId).getReplicas().iterator().next();
+ coreName = replica.getCoreName();
+ replicaName = Utils.parseMetricsReplicaName(collectionName, coreName);
+ registry = SolrCoreMetricManager.createRegistryName(true, collectionName, shardId, replicaName, null);
+ tag = "metrics:" + registry + ":INDEX.sizeInBytes";
+
+ triggerFiredLatch = new CountDownLatch(1);
+ listenerEvents.clear();
+
+ setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'metric_trigger'," +
+ "'event' : 'metric'," +
+ "'waitFor' : '" + waitForSeconds + "s'," +
+ "'enabled' : true," +
+ "'metric': '" + tag + "'" +
+ "'above' : 100.0," +
+ "'collection': '" + collectionName + "'" +
+ "'shard':'" + shardId + "'" +
+ "'preferredOperation':'addreplica'" +
+ "'actions' : [" +
+ "{'name':'compute','class':'" + ComputePlanAction.class.getName() + "'}," +
+ "{'name':'execute','class':'" + ExecutePlanAction.class.getName() + "'}," +
+ "{'name':'test','class':'" + MetricAction.class.getName() + "'}" +
+ "]" +
+ "}}";
+ req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+ response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
+ assertTrue("The trigger did not fire at all", await);
+ // wait for listener to capture the SUCCEEDED stage
+ Thread.sleep(2000);
+ assertEquals(listenerEvents.toString(), 4, listenerEvents.get("srt").size());
+ ev = listenerEvents.get("srt").get(0);
+ now = timeSource.getTimeNs();
+ // verify waitFor
+ assertTrue(TimeUnit.SECONDS.convert(waitForSeconds, TimeUnit.NANOSECONDS) - WAIT_FOR_DELTA_NANOS <= now - ev.event.getEventTime());
+ assertEquals(collectionName, ev.event.getProperties().get("collection"));
+ docCollection = solrClient.getZkStateReader().getClusterState().getCollection(collectionName);
+ assertEquals(5, docCollection.getReplicas().size());
+ }
+
+ public static class MetricAction extends TriggerActionBase {
+
+ @Override
+ public void process(TriggerEvent event, ActionContext context) throws Exception {
+ try {
+ events.add(event);
+ long currentTimeNanos = timeSource.getTimeNs();
+ long eventTimeNanos = event.getEventTime();
+ long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
+ if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
+ fail(event.source + " was fired before the configured waitFor period");
+ }
+ triggerFiredLatch.countDown();
+ } catch (Throwable t) {
+ log.debug("--throwable", t);
+ throw t;
+ }
+ }
+ }
+
+ public static class TestTriggerListener extends TriggerListenerBase {
+ @Override
+ public void init(SolrCloudManager cloudManager, AutoScalingConfig.TriggerListenerConfig config) {
+ super.init(cloudManager, config);
+ listenerCreated.countDown();
+ }
+
+ @Override
+ public synchronized void onEvent(TriggerEvent event, TriggerEventProcessorStage stage, String actionName,
+ ActionContext context, Throwable error, String message) {
+ List<CapturedEvent> lst = listenerEvents.computeIfAbsent(config.name, s -> new ArrayList<>());
+ lst.add(new CapturedEvent(timeSource.getTimeNs(), context, config, stage, actionName, event, message));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8533a46c/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeAddedTriggerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeAddedTriggerIntegrationTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeAddedTriggerIntegrationTest.java
new file mode 100644
index 0000000..ecf2437
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeAddedTriggerIntegrationTest.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.autoscaling;
+
+import java.lang.invoke.MethodHandles;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.cloud.SolrCloudManager;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.cloud.Overseer;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.util.LogLevel;
+import org.apache.solr.util.TimeOut;
+import org.apache.zookeeper.data.Stat;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.cloud.autoscaling.AutoScalingHandlerTest.createAutoScalingRequest;
+import static org.apache.solr.cloud.autoscaling.ScheduledTriggers.DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS;
+import static org.apache.solr.cloud.autoscaling.TriggerIntegrationTest.WAIT_FOR_DELTA_NANOS;
+import static org.apache.solr.common.cloud.ZkStateReader.SOLR_AUTOSCALING_CONF_PATH;
+
+@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG;org.apache.solr.client.solrj.cloud.autoscaling=DEBUG")
+public class NodeAddedTriggerIntegrationTest extends SolrCloudTestCase {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private static CountDownLatch actionConstructorCalled;
+ private static CountDownLatch actionInitCalled;
+ private static CountDownLatch triggerFiredLatch;
+ private static int waitForSeconds = 1;
+ private static AtomicBoolean triggerFired;
+ private static Set<TriggerEvent> events = ConcurrentHashMap.newKeySet();
+ private static SolrCloudManager cloudManager;
+
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ configureCluster(2)
+ .addConfig("conf", configset("cloud-minimal"))
+ .configure();
+ // disable .scheduled_maintenance
+ String suspendTriggerCommand = "{" +
+ "'suspend-trigger' : {'name' : '.scheduled_maintenance'}" +
+ "}";
+ SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, suspendTriggerCommand);
+ SolrClient solrClient = cluster.getSolrClient();
+ NamedList<Object> response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+ }
+
+ private static CountDownLatch getTriggerFiredLatch() {
+ return triggerFiredLatch;
+ }
+
+ @Before
+ public void setupTest() throws Exception {
+ // ensure that exactly 2 jetty nodes are running
+ int numJetties = cluster.getJettySolrRunners().size();
+ log.info("Found {} jetty instances running", numJetties);
+ for (int i = 2; i < numJetties; i++) {
+ int r = random().nextInt(cluster.getJettySolrRunners().size());
+ log.info("Shutdown extra jetty instance at port {}", cluster.getJettySolrRunner(r).getLocalPort());
+ cluster.stopJettySolrRunner(r);
+ }
+ for (int i = cluster.getJettySolrRunners().size(); i < 2; i++) {
+ // start jetty instances
+ cluster.startJettySolrRunner();
+ }
+ cluster.waitForAllNodes(5);
+
+ NamedList<Object> overSeerStatus = cluster.getSolrClient().request(CollectionAdminRequest.getOverseerStatus());
+ String overseerLeader = (String) overSeerStatus.get("leader");
+ int overseerLeaderIndex = 0;
+ for (int i = 0; i < cluster.getJettySolrRunners().size(); i++) {
+ JettySolrRunner jetty = cluster.getJettySolrRunner(i);
+ if (jetty.getNodeName().equals(overseerLeader)) {
+ overseerLeaderIndex = i;
+ break;
+ }
+ }
+ Overseer overseer = cluster.getJettySolrRunner(overseerLeaderIndex).getCoreContainer().getZkController().getOverseer();
+ ScheduledTriggers scheduledTriggers = ((OverseerTriggerThread) overseer.getTriggerThread().getThread()).getScheduledTriggers();
+ // aggressively remove all active scheduled triggers
+ scheduledTriggers.removeAll();
+
+ // clear any persisted auto scaling configuration
+ Stat stat = zkClient().setData(SOLR_AUTOSCALING_CONF_PATH, Utils.toJSON(new ZkNodeProps()), true);
+ log.info(SOLR_AUTOSCALING_CONF_PATH + " reset, new znode version {}", stat.getVersion());
+
+ cluster.deleteAllCollections();
+ cluster.getSolrClient().setDefaultCollection(null);
+
+ // restart Overseer. Even though we reset the autoscaling config some already running
+ // trigger threads may still continue to execute and produce spurious events
+ cluster.stopJettySolrRunner(overseerLeaderIndex);
+ Thread.sleep(5000);
+
+ waitForSeconds = 1 + random().nextInt(3);
+ actionConstructorCalled = new CountDownLatch(1);
+ actionInitCalled = new CountDownLatch(1);
+ triggerFiredLatch = new CountDownLatch(1);
+ triggerFired = new AtomicBoolean(false);
+ events.clear();
+
+ while (cluster.getJettySolrRunners().size() < 2) {
+ // perhaps a test stopped a node but didn't start it back
+ // lets start a node
+ cluster.startJettySolrRunner();
+ }
+
+ cloudManager = cluster.getJettySolrRunner(0).getCoreContainer().getZkController().getSolrCloudManager();
+ // clear any events or markers
+ // todo: consider the impact of such cleanup on regular cluster restarts
+ deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_EVENTS_PATH);
+ deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH);
+ deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH);
+ deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH);
+ }
+
+ private void deleteChildrenRecursively(String path) throws Exception {
+ cloudManager.getDistribStateManager().removeRecursively(path, true, false);
+ }
+
+ @Test
+ public void testNodeAddedTriggerRestoreState() throws Exception {
+ // for this test we want to update the trigger so we must assert that the actions were created twice
+ actionInitCalled = new CountDownLatch(2);
+
+ CloudSolrClient solrClient = cluster.getSolrClient();
+ waitForSeconds = 5;
+ String setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'node_added_restore_trigger'," +
+ "'event' : 'nodeAdded'," +
+ "'waitFor' : '5s'," + // should be enough for us to update the trigger
+ "'enabled' : true," +
+ "'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
+ "}}";
+ SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+ NamedList<Object> response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ TimeOut timeOut = new TimeOut(2, TimeUnit.SECONDS, cloudManager.getTimeSource());
+ while (actionInitCalled.getCount() == 0 && !timeOut.hasTimedOut()) {
+ Thread.sleep(200);
+ }
+ assertTrue("The action specified in node_added_restore_trigger was not instantiated even after 2 seconds", actionInitCalled.getCount() > 0);
+
+ // start a new node
+ JettySolrRunner newNode = cluster.startJettySolrRunner();
+
+ // ensure that the old trigger sees the new node, todo find a better way to do this
+ Thread.sleep(500 + TimeUnit.SECONDS.toMillis(DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS));
+
+ waitForSeconds = 0;
+ setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'node_added_restore_trigger'," +
+ "'event' : 'nodeAdded'," +
+ "'waitFor' : '0s'," + // update a property so that it replaces the old trigger, also we want it to fire immediately
+ "'enabled' : true," +
+ "'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
+ "}}";
+ req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+ response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ // wait until the second instance of action is created
+ if (!actionInitCalled.await(3, TimeUnit.SECONDS)) {
+ fail("Two TriggerAction instances should have been created by now");
+ }
+
+ boolean await = triggerFiredLatch.await(5, TimeUnit.SECONDS);
+ assertTrue("The trigger did not fire at all", await);
+ assertTrue(triggerFired.get());
+ NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = (NodeAddedTrigger.NodeAddedEvent) events.iterator().next();
+ assertNotNull(nodeAddedEvent);
+ List<String> nodeNames = (List<String>) nodeAddedEvent.getProperty(TriggerEvent.NODE_NAMES);
+ assertTrue(nodeNames.contains(newNode.getNodeName()));
+ }
+
+ @Test
+ public void testNodeAddedTrigger() throws Exception {
+ CloudSolrClient solrClient = cluster.getSolrClient();
+ String setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'node_added_trigger'," +
+ "'event' : 'nodeAdded'," +
+ "'waitFor' : '" + waitForSeconds + "s'," +
+ "'enabled' : true," +
+ "'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
+ "}}";
+ SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+ NamedList<Object> response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ if (!actionInitCalled.await(3, TimeUnit.SECONDS)) {
+ fail("The TriggerAction should have been created by now");
+ }
+
+ JettySolrRunner newNode = cluster.startJettySolrRunner();
+ boolean await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
+ assertTrue("The trigger did not fire at all", await);
+ assertTrue(triggerFired.get());
+ NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = (NodeAddedTrigger.NodeAddedEvent) events.iterator().next();
+ assertNotNull(nodeAddedEvent);
+ List<String> nodeNames = (List<String>) nodeAddedEvent.getProperty(TriggerEvent.NODE_NAMES);
+ assertTrue(nodeNames.contains(newNode.getNodeName()));
+
+ // reset
+ actionConstructorCalled = new CountDownLatch(1);
+ actionInitCalled = new CountDownLatch(1);
+
+ // update the trigger with exactly the same data
+ setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'node_added_trigger'," +
+ "'event' : 'nodeAdded'," +
+ "'waitFor' : '" + waitForSeconds + "s'," +
+ "'enabled' : true," +
+ "'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
+ "}}";
+ req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+ response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ // this should be a no-op so the action should have been created but init should not be called
+ if (!actionConstructorCalled.await(3, TimeUnit.SECONDS)) {
+ fail("The TriggerAction should have been created by now");
+ }
+
+ assertFalse(actionInitCalled.await(2, TimeUnit.SECONDS));
+ }
+
+ public static class TestTriggerAction extends TriggerActionBase {
+
+ public TestTriggerAction() {
+ actionConstructorCalled.countDown();
+ }
+
+ @Override
+ public void process(TriggerEvent event, ActionContext actionContext) {
+ try {
+ if (triggerFired.compareAndSet(false, true)) {
+ events.add(event);
+ long currentTimeNanos = TriggerIntegrationTest.timeSource.getTimeNs();
+ long eventTimeNanos = event.getEventTime();
+ long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
+ if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
+ fail(event.source + " was fired before the configured waitFor period");
+ }
+ getTriggerFiredLatch().countDown();
+ } else {
+ fail(event.source + " was fired more than once!");
+ }
+ } catch (Throwable t) {
+ log.debug("--throwable", t);
+ throw t;
+ }
+ }
+
+ @Override
+ public void init(Map<String, String> args) {
+ log.info("TestTriggerAction init");
+ actionInitCalled.countDown();
+ super.init(args);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8533a46c/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeLostTriggerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeLostTriggerIntegrationTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeLostTriggerIntegrationTest.java
new file mode 100644
index 0000000..6b1af65
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeLostTriggerIntegrationTest.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.autoscaling;
+
+import java.lang.invoke.MethodHandles;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.cloud.SolrCloudManager;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.cloud.Overseer;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.util.LogLevel;
+import org.apache.solr.util.TimeOut;
+import org.apache.zookeeper.data.Stat;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.cloud.autoscaling.AutoScalingHandlerTest.createAutoScalingRequest;
+import static org.apache.solr.cloud.autoscaling.ScheduledTriggers.DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS;
+import static org.apache.solr.cloud.autoscaling.TriggerIntegrationTest.WAIT_FOR_DELTA_NANOS;
+import static org.apache.solr.common.cloud.ZkStateReader.SOLR_AUTOSCALING_CONF_PATH;
+
+@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG;org.apache.solr.client.solrj.cloud.autoscaling=DEBUG")
+public class NodeLostTriggerIntegrationTest extends SolrCloudTestCase {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private static CountDownLatch actionConstructorCalled;
+ private static CountDownLatch actionInitCalled;
+ private static CountDownLatch triggerFiredLatch;
+ private static int waitForSeconds = 1;
+ private static AtomicBoolean triggerFired;
+ private static Set<TriggerEvent> events = ConcurrentHashMap.newKeySet();
+ private static SolrCloudManager cloudManager;
+
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ configureCluster(2)
+ .addConfig("conf", configset("cloud-minimal"))
+ .configure();
+ // disable .scheduled_maintenance
+ String suspendTriggerCommand = "{" +
+ "'suspend-trigger' : {'name' : '.scheduled_maintenance'}" +
+ "}";
+ SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, suspendTriggerCommand);
+ SolrClient solrClient = cluster.getSolrClient();
+ NamedList<Object> response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+ }
+
+ private static CountDownLatch getTriggerFiredLatch() {
+ return triggerFiredLatch;
+ }
+
+ @Before
+ public void setupTest() throws Exception {
+ // ensure that exactly 2 jetty nodes are running
+ int numJetties = cluster.getJettySolrRunners().size();
+ log.info("Found {} jetty instances running", numJetties);
+ for (int i = 2; i < numJetties; i++) {
+ int r = random().nextInt(cluster.getJettySolrRunners().size());
+ log.info("Shutdown extra jetty instance at port {}", cluster.getJettySolrRunner(r).getLocalPort());
+ cluster.stopJettySolrRunner(r);
+ }
+ for (int i = cluster.getJettySolrRunners().size(); i < 2; i++) {
+ // start jetty instances
+ cluster.startJettySolrRunner();
+ }
+ cluster.waitForAllNodes(5);
+
+ NamedList<Object> overSeerStatus = cluster.getSolrClient().request(CollectionAdminRequest.getOverseerStatus());
+ String overseerLeader = (String) overSeerStatus.get("leader");
+ int overseerLeaderIndex = 0;
+ for (int i = 0; i < cluster.getJettySolrRunners().size(); i++) {
+ JettySolrRunner jetty = cluster.getJettySolrRunner(i);
+ if (jetty.getNodeName().equals(overseerLeader)) {
+ overseerLeaderIndex = i;
+ break;
+ }
+ }
+ Overseer overseer = cluster.getJettySolrRunner(overseerLeaderIndex).getCoreContainer().getZkController().getOverseer();
+ ScheduledTriggers scheduledTriggers = ((OverseerTriggerThread) overseer.getTriggerThread().getThread()).getScheduledTriggers();
+ // aggressively remove all active scheduled triggers
+ scheduledTriggers.removeAll();
+
+ // clear any persisted auto scaling configuration
+ Stat stat = zkClient().setData(SOLR_AUTOSCALING_CONF_PATH, Utils.toJSON(new ZkNodeProps()), true);
+ log.info(SOLR_AUTOSCALING_CONF_PATH + " reset, new znode version {}", stat.getVersion());
+
+ cluster.deleteAllCollections();
+ cluster.getSolrClient().setDefaultCollection(null);
+
+ // restart Overseer. Even though we reset the autoscaling config some already running
+ // trigger threads may still continue to execute and produce spurious events
+ cluster.stopJettySolrRunner(overseerLeaderIndex);
+ Thread.sleep(5000);
+
+ waitForSeconds = 1 + random().nextInt(3);
+ actionConstructorCalled = new CountDownLatch(1);
+ actionInitCalled = new CountDownLatch(1);
+ triggerFiredLatch = new CountDownLatch(1);
+ triggerFired = new AtomicBoolean(false);
+ events.clear();
+
+ while (cluster.getJettySolrRunners().size() < 2) {
+ // perhaps a test stopped a node but didn't start it back
+ // lets start a node
+ cluster.startJettySolrRunner();
+ }
+
+ cloudManager = cluster.getJettySolrRunner(0).getCoreContainer().getZkController().getSolrCloudManager();
+ // clear any events or markers
+ // todo: consider the impact of such cleanup on regular cluster restarts
+ deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_EVENTS_PATH);
+ deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH);
+ deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH);
+ deleteChildrenRecursively(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH);
+ }
+
+ private void deleteChildrenRecursively(String path) throws Exception {
+ cloudManager.getDistribStateManager().removeRecursively(path, true, false);
+ }
+
+ @Test
+ public void testNodeLostTriggerRestoreState() throws Exception {
+ // for this test we want to update the trigger so we must assert that the actions were created twice
+ actionInitCalled = new CountDownLatch(2);
+
+ // start a new node
+ JettySolrRunner newNode = cluster.startJettySolrRunner();
+ String nodeName = newNode.getNodeName();
+
+ CloudSolrClient solrClient = cluster.getSolrClient();
+ waitForSeconds = 5;
+ String setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'node_lost_restore_trigger'," +
+ "'event' : 'nodeLost'," +
+ "'waitFor' : '5s'," + // should be enough for us to update the trigger
+ "'enabled' : true," +
+ "'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
+ "}}";
+ SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+ NamedList<Object> response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ TimeOut timeOut = new TimeOut(2, TimeUnit.SECONDS, cloudManager.getTimeSource());
+ while (actionInitCalled.getCount() == 0 && !timeOut.hasTimedOut()) {
+ Thread.sleep(200);
+ }
+ assertTrue("The action specified in node_lost_restore_trigger was not instantiated even after 2 seconds", actionInitCalled.getCount() > 0);
+
+ List<JettySolrRunner> jettySolrRunners = cluster.getJettySolrRunners();
+ int index = -1;
+ for (int i = 0; i < jettySolrRunners.size(); i++) {
+ JettySolrRunner runner = jettySolrRunners.get(i);
+ if (runner == newNode) index = i;
+ }
+ assertFalse(index == -1);
+ cluster.stopJettySolrRunner(index);
+
+ // ensure that the old trigger sees the stopped node, todo find a better way to do this
+ Thread.sleep(500 + TimeUnit.SECONDS.toMillis(DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS));
+
+ waitForSeconds = 0;
+ setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'node_lost_restore_trigger'," +
+ "'event' : 'nodeLost'," +
+ "'waitFor' : '0s'," + // update a property so that it replaces the old trigger, also we want it to fire immediately
+ "'enabled' : true," +
+ "'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
+ "}}";
+ req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+ response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ // wait until the second instance of action is created
+ if (!actionInitCalled.await(3, TimeUnit.SECONDS)) {
+ fail("Two TriggerAction instances should have been created by now");
+ }
+
+ boolean await = triggerFiredLatch.await(5, TimeUnit.SECONDS);
+ assertTrue("The trigger did not fire at all", await);
+ assertTrue(triggerFired.get());
+ NodeLostTrigger.NodeLostEvent nodeLostEvent = (NodeLostTrigger.NodeLostEvent) events.iterator().next();
+ assertNotNull(nodeLostEvent);
+ List<String> nodeNames = (List<String>) nodeLostEvent.getProperty(TriggerEvent.NODE_NAMES);
+ assertTrue(nodeNames.contains(nodeName));
+ }
+
+ @Test
+ public void testNodeLostTrigger() throws Exception {
+ CloudSolrClient solrClient = cluster.getSolrClient();
+ String setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'node_lost_trigger'," +
+ "'event' : 'nodeLost'," +
+ "'waitFor' : '" + waitForSeconds + "s'," +
+ "'enabled' : true," +
+ "'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
+ "}}";
+ NamedList<Object> overSeerStatus = cluster.getSolrClient().request(CollectionAdminRequest.getOverseerStatus());
+ String overseerLeader = (String) overSeerStatus.get("leader");
+ int nonOverseerLeaderIndex = 0;
+ for (int i = 0; i < cluster.getJettySolrRunners().size(); i++) {
+ JettySolrRunner jetty = cluster.getJettySolrRunner(i);
+ if (!jetty.getNodeName().equals(overseerLeader)) {
+ nonOverseerLeaderIndex = i;
+ }
+ }
+ SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+ NamedList<Object> response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ if (!actionInitCalled.await(3, TimeUnit.SECONDS)) {
+ fail("The TriggerAction should have been created by now");
+ }
+
+ triggerFired.set(false);
+ triggerFiredLatch = new CountDownLatch(1);
+ String lostNodeName = cluster.getJettySolrRunner(nonOverseerLeaderIndex).getNodeName();
+ cluster.stopJettySolrRunner(nonOverseerLeaderIndex);
+ boolean await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
+ assertTrue("The trigger did not fire at all", await);
+ assertTrue(triggerFired.get());
+ NodeLostTrigger.NodeLostEvent nodeLostEvent = (NodeLostTrigger.NodeLostEvent) events.iterator().next();
+ assertNotNull(nodeLostEvent);
+ List<String> nodeNames = (List<String>) nodeLostEvent.getProperty(TriggerEvent.NODE_NAMES);
+ assertTrue(nodeNames.contains(lostNodeName));
+
+ // reset
+ actionConstructorCalled = new CountDownLatch(1);
+ actionInitCalled = new CountDownLatch(1);
+
+ // update the trigger with exactly the same data
+ setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'node_lost_trigger'," +
+ "'event' : 'nodeLost'," +
+ "'waitFor' : '" + waitForSeconds + "s'," +
+ "'enabled' : true," +
+ "'actions' : [{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}]" +
+ "}}";
+ req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+ response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ // this should be a no-op so the action should have been created but init should not be called
+ if (!actionConstructorCalled.await(3, TimeUnit.SECONDS)) {
+ fail("The TriggerAction should have been created by now");
+ }
+
+ assertFalse(actionInitCalled.await(2, TimeUnit.SECONDS));
+ }
+
+ public static class TestTriggerAction extends TriggerActionBase {
+
+ public TestTriggerAction() {
+ actionConstructorCalled.countDown();
+ }
+
+ @Override
+ public void process(TriggerEvent event, ActionContext actionContext) {
+ try {
+ if (triggerFired.compareAndSet(false, true)) {
+ events.add(event);
+ long currentTimeNanos = TriggerIntegrationTest.timeSource.getTimeNs();
+ long eventTimeNanos = event.getEventTime();
+ long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
+ if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
+ fail(event.source + " was fired before the configured waitFor period");
+ }
+ getTriggerFiredLatch().countDown();
+ } else {
+ fail(event.source + " was fired more than once!");
+ }
+ } catch (Throwable t) {
+ log.debug("--throwable", t);
+ throw t;
+ }
+ }
+
+ @Override
+ public void init(Map<String, String> args) {
+ log.info("TestTriggerAction init");
+ actionInitCalled.countDown();
+ super.init(args);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8533a46c/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeMarkersRegistrationTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeMarkersRegistrationTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeMarkersRegistrationTest.java
new file mode 100644
index 0000000..38c2165
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeMarkersRegistrationTest.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.autoscaling;
+
+import java.lang.invoke.MethodHandles;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.lucene.util.LuceneTestCase;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.cloud.LiveNodesListener;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.util.LogLevel;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.cloud.autoscaling.AutoScalingHandlerTest.createAutoScalingRequest;
+
+@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG;org.apache.solr.client.solrj.cloud.autoscaling=DEBUG")
+@LuceneTestCase.BadApple(bugUrl = "https://issues.apache.org/jira/browse/SOLR-12028")
+public class NodeMarkersRegistrationTest extends SolrCloudTestCase {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private static CountDownLatch actionInitCalled;
+ private static CountDownLatch triggerFiredLatch;
+ private static CountDownLatch actionConstructorCalled;
+ private static Set<TriggerEvent> events = ConcurrentHashMap.newKeySet();
+ private static ZkStateReader zkStateReader;
+ private static ReentrantLock lock = new ReentrantLock();
+
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ configureCluster(2)
+ .addConfig("conf", configset("cloud-minimal"))
+ .configure();
+ zkStateReader = cluster.getSolrClient().getZkStateReader();
+ // disable .scheduled_maintenance
+ String suspendTriggerCommand = "{" +
+ "'suspend-trigger' : {'name' : '.scheduled_maintenance'}" +
+ "}";
+ SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, suspendTriggerCommand);
+ SolrClient solrClient = cluster.getSolrClient();
+ NamedList<Object> response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+ }
+
+ private static CountDownLatch getTriggerFiredLatch() {
+ return triggerFiredLatch;
+ }
+
+ @Test
+ public void testNodeMarkersRegistration() throws Exception {
+ // for this test we want to create two triggers so we must assert that the actions were created twice
+ actionInitCalled = new CountDownLatch(2);
+ // similarly we want both triggers to fire
+ triggerFiredLatch = new CountDownLatch(2);
+ actionConstructorCalled = new CountDownLatch(1);
+ TestLiveNodesListener listener = registerLiveNodesListener();
+
+ NamedList<Object> overSeerStatus = cluster.getSolrClient().request(CollectionAdminRequest.getOverseerStatus());
+ String overseerLeader = (String) overSeerStatus.get("leader");
+ int overseerLeaderIndex = 0;
+ for (int i = 0; i < cluster.getJettySolrRunners().size(); i++) {
+ JettySolrRunner jetty = cluster.getJettySolrRunner(i);
+ if (jetty.getNodeName().equals(overseerLeader)) {
+ overseerLeaderIndex = i;
+ break;
+ }
+ }
+ // add a node
+ JettySolrRunner node = cluster.startJettySolrRunner();
+ if (!listener.onChangeLatch.await(10, TimeUnit.SECONDS)) {
+ fail("onChange listener didn't execute on cluster change");
+ }
+ assertEquals(1, listener.addedNodes.size());
+ assertEquals(node.getNodeName(), listener.addedNodes.iterator().next());
+ // verify that a znode doesn't exist (no trigger)
+ String pathAdded = ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + node.getNodeName();
+ assertFalse("Path " + pathAdded + " was created but there are no nodeAdded triggers", zkClient().exists(pathAdded, true));
+ listener.reset();
+ // stop overseer
+ log.info("====== KILL OVERSEER 1");
+ cluster.stopJettySolrRunner(overseerLeaderIndex);
+ if (!listener.onChangeLatch.await(10, TimeUnit.SECONDS)) {
+ fail("onChange listener didn't execute on cluster change");
+ }
+ assertEquals(1, listener.lostNodes.size());
+ assertEquals(overseerLeader, listener.lostNodes.iterator().next());
+ assertEquals(0, listener.addedNodes.size());
+ // wait until the new overseer is up
+ Thread.sleep(5000);
+ // verify that a znode does NOT exist - there's no nodeLost trigger,
+ // so the new overseer cleaned up existing nodeLost markers
+ String pathLost = ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH + "/" + overseerLeader;
+ assertFalse("Path " + pathLost + " exists", zkClient().exists(pathLost, true));
+
+ listener.reset();
+
+ // set up triggers
+ CloudSolrClient solrClient = cluster.getSolrClient();
+
+ log.info("====== ADD TRIGGERS");
+ String setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'node_added_triggerMR'," +
+ "'event' : 'nodeAdded'," +
+ "'waitFor' : '1s'," +
+ "'enabled' : true," +
+ "'actions' : [{'name':'test','class':'" + TestEventMarkerAction.class.getName() + "'}]" +
+ "}}";
+ SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+ NamedList<Object> response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'node_lost_triggerMR'," +
+ "'event' : 'nodeLost'," +
+ "'waitFor' : '1s'," +
+ "'enabled' : true," +
+ "'actions' : [{'name':'test','class':'" + TestEventMarkerAction.class.getName() + "'}]" +
+ "}}";
+ req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+ response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ overSeerStatus = cluster.getSolrClient().request(CollectionAdminRequest.getOverseerStatus());
+ overseerLeader = (String) overSeerStatus.get("leader");
+ overseerLeaderIndex = 0;
+ for (int i = 0; i < cluster.getJettySolrRunners().size(); i++) {
+ JettySolrRunner jetty = cluster.getJettySolrRunner(i);
+ if (jetty.getNodeName().equals(overseerLeader)) {
+ overseerLeaderIndex = i;
+ break;
+ }
+ }
+
+ // create another node
+ log.info("====== ADD NODE 1");
+ JettySolrRunner node1 = cluster.startJettySolrRunner();
+ if (!listener.onChangeLatch.await(10, TimeUnit.SECONDS)) {
+ fail("onChange listener didn't execute on cluster change");
+ }
+ assertEquals(1, listener.addedNodes.size());
+ assertEquals(node1.getNodeName(), listener.addedNodes.iterator().next());
+ // verify that a znode exists
+ pathAdded = ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + node1.getNodeName();
+ assertTrue("Path " + pathAdded + " wasn't created", zkClient().exists(pathAdded, true));
+
+ Thread.sleep(5000);
+ // nodeAdded marker should be consumed now by nodeAdded trigger
+ assertFalse("Path " + pathAdded + " should have been deleted", zkClient().exists(pathAdded, true));
+
+ listener.reset();
+ events.clear();
+ triggerFiredLatch = new CountDownLatch(1);
+ // kill overseer again
+ log.info("====== KILL OVERSEER 2");
+ cluster.stopJettySolrRunner(overseerLeaderIndex);
+ if (!listener.onChangeLatch.await(10, TimeUnit.SECONDS)) {
+ fail("onChange listener didn't execute on cluster change");
+ }
+
+
+ if (!triggerFiredLatch.await(20, TimeUnit.SECONDS)) {
+ fail("Trigger should have fired by now");
+ }
+ assertEquals(1, events.size());
+ TriggerEvent ev = events.iterator().next();
+ List<String> nodeNames = (List<String>) ev.getProperty(TriggerEvent.NODE_NAMES);
+ assertTrue(nodeNames.contains(overseerLeader));
+ assertEquals(TriggerEventType.NODELOST, ev.getEventType());
+ }
+
+ private TestLiveNodesListener registerLiveNodesListener() {
+ TestLiveNodesListener listener = new TestLiveNodesListener();
+ zkStateReader.registerLiveNodesListener(listener);
+ return listener;
+ }
+
+ private static class TestLiveNodesListener implements LiveNodesListener {
+ Set<String> lostNodes = new HashSet<>();
+ Set<String> addedNodes = new HashSet<>();
+ CountDownLatch onChangeLatch = new CountDownLatch(1);
+
+ public void reset() {
+ lostNodes.clear();
+ addedNodes.clear();
+ onChangeLatch = new CountDownLatch(1);
+ }
+
+ @Override
+ public void onChange(SortedSet<String> oldLiveNodes, SortedSet<String> newLiveNodes) {
+ onChangeLatch.countDown();
+ Set<String> old = new HashSet<>(oldLiveNodes);
+ old.removeAll(newLiveNodes);
+ if (!old.isEmpty()) {
+ lostNodes.addAll(old);
+ }
+ newLiveNodes.removeAll(oldLiveNodes);
+ if (!newLiveNodes.isEmpty()) {
+ addedNodes.addAll(newLiveNodes);
+ }
+ }
+ }
+
+ public static class TestEventMarkerAction extends TriggerActionBase {
+
+ public TestEventMarkerAction() {
+ actionConstructorCalled.countDown();
+ }
+
+ @Override
+ public void process(TriggerEvent event, ActionContext actionContext) {
+ boolean locked = lock.tryLock();
+ if (!locked) {
+ log.info("We should never have a tryLock fail because actions are never supposed to be executed concurrently");
+ return;
+ }
+ try {
+ events.add(event);
+ getTriggerFiredLatch().countDown();
+ } catch (Throwable t) {
+ log.debug("--throwable", t);
+ throw t;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public void init(Map<String, String> args) {
+ log.info("TestEventMarkerAction init");
+ actionInitCalled.countDown();
+ super.init(args);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8533a46c/solr/core/src/test/org/apache/solr/cloud/autoscaling/ScheduledTriggerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/ScheduledTriggerIntegrationTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/ScheduledTriggerIntegrationTest.java
new file mode 100644
index 0000000..24e7420
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/ScheduledTriggerIntegrationTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.autoscaling;
+
+import java.lang.invoke.MethodHandles;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.util.LogLevel;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.cloud.autoscaling.AutoScalingHandlerTest.createAutoScalingRequest;
+
+/**
+ * Integration test for {@link ScheduledTrigger}
+ */
+@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG;org.apache.solr.client.solrj.cloud.autoscaling=DEBUG")
+@BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 26-Mar-2018
+public class ScheduledTriggerIntegrationTest extends SolrCloudTestCase {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private static CountDownLatch triggerFiredLatch;
+ private static Set<TriggerEvent> events = ConcurrentHashMap.newKeySet();
+ private static AtomicReference<Map<String, Object>> actionContextPropertiesRef = new AtomicReference<>();
+
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ configureCluster(2)
+ .addConfig("conf", configset("cloud-minimal"))
+ .configure();
+ // disable .scheduled_maintenance
+ String suspendTriggerCommand = "{" +
+ "'suspend-trigger' : {'name' : '.scheduled_maintenance'}" +
+ "}";
+ SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, suspendTriggerCommand);
+ SolrClient solrClient = cluster.getSolrClient();
+ NamedList<Object> response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+ triggerFiredLatch = new CountDownLatch(1);
+ }
+
+ @Test
+ public void testScheduledTrigger() throws Exception {
+ CloudSolrClient solrClient = cluster.getSolrClient();
+
+ // this collection will place 2 cores on 1st node and 1 core on 2nd node
+ String collectionName = "testScheduledTrigger";
+ CollectionAdminRequest.createCollection(collectionName, 1, 3)
+ .setMaxShardsPerNode(5).process(solrClient);
+ waitForState("", collectionName, clusterShape(1, 3));
+
+ // create a policy which allows only 1 core per node thereby creating a violation for the above collection
+ String setClusterPolicy = "{\n" +
+ " \"set-cluster-policy\" : [\n" +
+ " {\"cores\" : \"<2\", \"node\" : \"#EACH\"}\n" +
+ " ]\n" +
+ "}";
+ SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setClusterPolicy);
+ NamedList<Object> response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ // start a new node which can be used to balance the cluster as per policy
+ JettySolrRunner newNode = cluster.startJettySolrRunner();
+ cluster.waitForAllNodes(10);
+
+ String setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'sched_trigger_integration1'," +
+ "'event' : 'scheduled'," +
+ "'startTime' : '" + new Date().toInstant().toString() + "'" +
+ "'every' : '+3SECONDS'" +
+ "'actions' : [" +
+ "{'name' : 'compute','class':'" + ComputePlanAction.class.getName() + "'}," +
+ "{'name' : 'execute','class':'" + ExecutePlanAction.class.getName() + "'}," +
+ "{'name' : 'recorder', 'class': '" + ContextPropertiesRecorderAction.class.getName() + "'}" +
+ "]}}";
+ req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+ response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ assertTrue("ScheduledTrigger did not fire within 20 seconds", triggerFiredLatch.await(20, TimeUnit.SECONDS));
+ assertEquals(1, events.size());
+ Map<String, Object> actionContextProps = actionContextPropertiesRef.get();
+ assertNotNull(actionContextProps);
+ TriggerEvent event = events.iterator().next();
+ List<SolrRequest> operations = (List<SolrRequest>) actionContextProps.get("operations");
+ assertNotNull(operations);
+ assertEquals(1, operations.size());
+ for (SolrRequest operation : operations) {
+ SolrParams params = operation.getParams();
+ assertEquals(newNode.getNodeName(), params.get("targetNode"));
+ }
+ }
+
+ public static class ContextPropertiesRecorderAction extends TriggerActionBase {
+ @Override
+ public void process(TriggerEvent event, ActionContext actionContext) {
+ actionContextPropertiesRef.set(actionContext.getProperties());
+ try {
+ events.add(event);
+ triggerFiredLatch.countDown();
+ } catch (Throwable t) {
+ log.debug("--throwable", t);
+ throw t;
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8533a46c/solr/core/src/test/org/apache/solr/cloud/autoscaling/SearchRateTriggerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/SearchRateTriggerIntegrationTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/SearchRateTriggerIntegrationTest.java
new file mode 100644
index 0000000..547be5c
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/SearchRateTriggerIntegrationTest.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.autoscaling;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.util.concurrent.AtomicDouble;
+import org.apache.lucene.util.LuceneTestCase;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.cloud.SolrCloudManager;
+import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
+import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
+import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventProcessorStage;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.util.LogLevel;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.cloud.autoscaling.AutoScalingHandlerTest.createAutoScalingRequest;
+import static org.apache.solr.cloud.autoscaling.TriggerIntegrationTest.WAIT_FOR_DELTA_NANOS;
+import static org.apache.solr.cloud.autoscaling.TriggerIntegrationTest.timeSource;
+
+/**
+ * Integration test for {@link SearchRateTrigger}
+ */
+@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG;org.apache.solr.client.solrj.cloud.autoscaling=DEBUG")
+@LuceneTestCase.BadApple(bugUrl = "https://issues.apache.org/jira/browse/SOLR-12028")
+public class SearchRateTriggerIntegrationTest extends SolrCloudTestCase {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private static CountDownLatch triggerFiredLatch = new CountDownLatch(1);
+ private static CountDownLatch listenerCreated = new CountDownLatch(1);
+ private static int waitForSeconds = 1;
+ private static Set<TriggerEvent> events = ConcurrentHashMap.newKeySet();
+ private static Map<String, List<CapturedEvent>> listenerEvents = new HashMap<>();
+
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ configureCluster(5)
+ .addConfig("conf", configset("cloud-minimal"))
+ .configure();
+ // disable .scheduled_maintenance
+ String suspendTriggerCommand = "{" +
+ "'suspend-trigger' : {'name' : '.scheduled_maintenance'}" +
+ "}";
+ SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, suspendTriggerCommand);
+ SolrClient solrClient = cluster.getSolrClient();
+ NamedList<Object> response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+ }
+
+ @Test
+ public void testSearchRate() throws Exception {
+ CloudSolrClient solrClient = cluster.getSolrClient();
+ String COLL1 = "collection1";
+ CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(COLL1,
+ "conf", 1, 2);
+ create.process(solrClient);
+ String setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'search_rate_trigger'," +
+ "'event' : 'searchRate'," +
+ "'waitFor' : '" + waitForSeconds + "s'," +
+ "'enabled' : true," +
+ "'rate' : 1.0," +
+ "'actions' : [" +
+ "{'name':'compute','class':'" + ComputePlanAction.class.getName() + "'}," +
+ "{'name':'execute','class':'" + ExecutePlanAction.class.getName() + "'}," +
+ "{'name':'test','class':'" + TestSearchRateAction.class.getName() + "'}" +
+ "]" +
+ "}}";
+ SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+ NamedList<Object> response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ String setListenerCommand1 = "{" +
+ "'set-listener' : " +
+ "{" +
+ "'name' : 'srt'," +
+ "'trigger' : 'search_rate_trigger'," +
+ "'stage' : ['FAILED','SUCCEEDED']," +
+ "'afterAction': ['compute', 'execute', 'test']," +
+ "'class' : '" + TestTriggerListener.class.getName() + "'" +
+ "}" +
+ "}";
+ req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand1);
+ response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+ SolrParams query = params(CommonParams.Q, "*:*");
+ for (int i = 0; i < 500; i++) {
+ solrClient.query(COLL1, query);
+ }
+ boolean await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
+ assertTrue("The trigger did not fire at all", await);
+ // wait for listener to capture the SUCCEEDED stage
+ Thread.sleep(5000);
+ List<CapturedEvent> events = listenerEvents.get("srt");
+ assertEquals(listenerEvents.toString(), 4, events.size());
+ assertEquals("AFTER_ACTION", events.get(0).stage.toString());
+ assertEquals("compute", events.get(0).actionName);
+ assertEquals("AFTER_ACTION", events.get(1).stage.toString());
+ assertEquals("execute", events.get(1).actionName);
+ assertEquals("AFTER_ACTION", events.get(2).stage.toString());
+ assertEquals("test", events.get(2).actionName);
+ assertEquals("SUCCEEDED", events.get(3).stage.toString());
+ assertNull(events.get(3).actionName);
+
+ CapturedEvent ev = events.get(0);
+ long now = timeSource.getTimeNs();
+ // verify waitFor
+ assertTrue(TimeUnit.SECONDS.convert(waitForSeconds, TimeUnit.NANOSECONDS) - WAIT_FOR_DELTA_NANOS <= now - ev.event.getEventTime());
+ Map<String, Double> nodeRates = (Map<String, Double>) ev.event.getProperties().get("node");
+ assertNotNull("nodeRates", nodeRates);
+ assertTrue(nodeRates.toString(), nodeRates.size() > 0);
+ AtomicDouble totalNodeRate = new AtomicDouble();
+ nodeRates.forEach((n, r) -> totalNodeRate.addAndGet(r));
+ List<ReplicaInfo> replicaRates = (List<ReplicaInfo>) ev.event.getProperties().get("replica");
+ assertNotNull("replicaRates", replicaRates);
+ assertTrue(replicaRates.toString(), replicaRates.size() > 0);
+ AtomicDouble totalReplicaRate = new AtomicDouble();
+ replicaRates.forEach(r -> {
+ assertTrue(r.toString(), r.getVariable("rate") != null);
+ totalReplicaRate.addAndGet((Double) r.getVariable("rate"));
+ });
+ Map<String, Object> shardRates = (Map<String, Object>) ev.event.getProperties().get("shard");
+ assertNotNull("shardRates", shardRates);
+ assertEquals(shardRates.toString(), 1, shardRates.size());
+ shardRates = (Map<String, Object>) shardRates.get(COLL1);
+ assertNotNull("shardRates", shardRates);
+ assertEquals(shardRates.toString(), 1, shardRates.size());
+ AtomicDouble totalShardRate = new AtomicDouble();
+ shardRates.forEach((s, r) -> totalShardRate.addAndGet((Double) r));
+ Map<String, Double> collectionRates = (Map<String, Double>) ev.event.getProperties().get("collection");
+ assertNotNull("collectionRates", collectionRates);
+ assertEquals(collectionRates.toString(), 1, collectionRates.size());
+ Double collectionRate = collectionRates.get(COLL1);
+ assertNotNull(collectionRate);
+ assertTrue(collectionRate > 5.0);
+ assertEquals(collectionRate, totalNodeRate.get(), 5.0);
+ assertEquals(collectionRate, totalShardRate.get(), 5.0);
+ assertEquals(collectionRate, totalReplicaRate.get(), 5.0);
+
+ // check operations
+ List<Map<String, Object>> ops = (List<Map<String, Object>>) ev.context.get("properties.operations");
+ assertNotNull(ops);
+ assertTrue(ops.size() > 1);
+ for (Map<String, Object> m : ops) {
+ assertEquals("ADDREPLICA", m.get("params.action"));
+ }
+ }
+
+ public static class TestSearchRateAction extends TriggerActionBase {
+
+ @Override
+ public void process(TriggerEvent event, ActionContext context) throws Exception {
+ try {
+ events.add(event);
+ long currentTimeNanos = timeSource.getTimeNs();
+ long eventTimeNanos = event.getEventTime();
+ long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
+ if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
+ fail(event.source + " was fired before the configured waitFor period");
+ }
+ triggerFiredLatch.countDown();
+ } catch (Throwable t) {
+ log.debug("--throwable", t);
+ throw t;
+ }
+ }
+ }
+
+ public static class TestTriggerListener extends TriggerListenerBase {
+ @Override
+ public void init(SolrCloudManager cloudManager, AutoScalingConfig.TriggerListenerConfig config) {
+ super.init(cloudManager, config);
+ listenerCreated.countDown();
+ }
+
+ @Override
+ public synchronized void onEvent(TriggerEvent event, TriggerEventProcessorStage stage, String actionName,
+ ActionContext context, Throwable error, String message) {
+ List<CapturedEvent> lst = listenerEvents.computeIfAbsent(config.name, s -> new ArrayList<>());
+ lst.add(new CapturedEvent(timeSource.getTimeNs(), context, config, stage, actionName, event, message));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8533a46c/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerCooldownIntegrationTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerCooldownIntegrationTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerCooldownIntegrationTest.java
new file mode 100644
index 0000000..8d69bad
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerCooldownIntegrationTest.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.autoscaling;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.cloud.SolrCloudManager;
+import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
+import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventProcessorStage;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.params.AutoScalingParams;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.util.LogLevel;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.cloud.autoscaling.AutoScalingHandlerTest.createAutoScalingRequest;
+import static org.apache.solr.cloud.autoscaling.TriggerIntegrationTest.WAIT_FOR_DELTA_NANOS;
+import static org.apache.solr.cloud.autoscaling.TriggerIntegrationTest.timeSource;
+
+@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG;org.apache.solr.client.solrj.cloud.autoscaling=DEBUG")
+public class TriggerCooldownIntegrationTest extends SolrCloudTestCase {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ static Map<String, List<CapturedEvent>> listenerEvents = new HashMap<>();
+ static CountDownLatch listenerCreated = new CountDownLatch(1);
+ static boolean failDummyAction = false;
+ private static CountDownLatch actionConstructorCalled = new CountDownLatch(1);
+ private static CountDownLatch actionInitCalled = new CountDownLatch(1);
+ private static CountDownLatch triggerFiredLatch = new CountDownLatch(1);
+ private static int waitForSeconds = 1;
+ private static AtomicBoolean triggerFired = new AtomicBoolean();
+ private static Set<TriggerEvent> events = ConcurrentHashMap.newKeySet();
+
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ configureCluster(2)
+ .addConfig("conf", configset("cloud-minimal"))
+ .configure();
+ // disable .scheduled_maintenance
+ String suspendTriggerCommand = "{" +
+ "'suspend-trigger' : {'name' : '.scheduled_maintenance'}" +
+ "}";
+ SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, suspendTriggerCommand);
+ SolrClient solrClient = cluster.getSolrClient();
+ NamedList<Object> response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+ }
+
+ @Test
+ public void testCooldown() throws Exception {
+ CloudSolrClient solrClient = cluster.getSolrClient();
+ failDummyAction = false;
+ waitForSeconds = 1;
+ String setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'node_added_cooldown_trigger'," +
+ "'event' : 'nodeAdded'," +
+ "'waitFor' : '" + waitForSeconds + "s'," +
+ "'enabled' : true," +
+ "'actions' : [" +
+ "{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}" +
+ "]" +
+ "}}";
+ SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+ NamedList<Object> response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ String setListenerCommand1 = "{" +
+ "'set-listener' : " +
+ "{" +
+ "'name' : 'bar'," +
+ "'trigger' : 'node_added_cooldown_trigger'," +
+ "'stage' : ['FAILED','SUCCEEDED', 'IGNORED']," +
+ "'class' : '" + TestTriggerListener.class.getName() + "'" +
+ "}" +
+ "}";
+ req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand1);
+ response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ listenerCreated = new CountDownLatch(1);
+ listenerEvents.clear();
+
+ JettySolrRunner newNode = cluster.startJettySolrRunner();
+ boolean await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
+ assertTrue("The trigger did not fire at all", await);
+ assertTrue(triggerFired.get());
+ // wait for listener to capture the SUCCEEDED stage
+ Thread.sleep(1000);
+
+ List<CapturedEvent> capturedEvents = listenerEvents.get("bar");
+ // we may get a few IGNORED events if other tests caused events within cooldown period
+ assertTrue(capturedEvents.toString(), capturedEvents.size() > 0);
+ long prevTimestamp = capturedEvents.get(capturedEvents.size() - 1).timestamp;
+
+ // reset the trigger and captured events
+ listenerEvents.clear();
+ triggerFiredLatch = new CountDownLatch(1);
+ triggerFired.compareAndSet(true, false);
+
+ JettySolrRunner newNode2 = cluster.startJettySolrRunner();
+ await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
+ assertTrue("The trigger did not fire at all", await);
+ // wait for listener to capture the SUCCEEDED stage
+ Thread.sleep(2000);
+
+ // there must be at least one IGNORED event due to cooldown, and one SUCCEEDED event
+ capturedEvents = listenerEvents.get("bar");
+ assertEquals(capturedEvents.toString(), 1, capturedEvents.size());
+ CapturedEvent ev = capturedEvents.get(0);
+ assertEquals(ev.toString(), TriggerEventProcessorStage.SUCCEEDED, ev.stage);
+ // the difference between timestamps of the first SUCCEEDED and the last SUCCEEDED
+ // must be larger than cooldown period
+ assertTrue("timestamp delta is less than default cooldown period", ev.timestamp - prevTimestamp > TimeUnit.SECONDS.toNanos(ScheduledTriggers.DEFAULT_COOLDOWN_PERIOD_SECONDS));
+ prevTimestamp = ev.timestamp;
+
+ // this also resets the cooldown period
+ long modifiedCooldownPeriodSeconds = 7;
+ String setPropertiesCommand = "{\n" +
+ "\t\"set-properties\" : {\n" +
+ "\t\t\"" + AutoScalingParams.TRIGGER_COOLDOWN_PERIOD_SECONDS + "\" : " + modifiedCooldownPeriodSeconds + "\n" +
+ "\t}\n" +
+ "}";
+ solrClient.request(createAutoScalingRequest(SolrRequest.METHOD.POST, setPropertiesCommand));
+ req = createAutoScalingRequest(SolrRequest.METHOD.GET, null);
+ response = solrClient.request(req);
+
+ // reset the trigger and captured events
+ listenerEvents.clear();
+ triggerFiredLatch = new CountDownLatch(1);
+ triggerFired.compareAndSet(true, false);
+
+ JettySolrRunner newNode3 = cluster.startJettySolrRunner();
+ await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
+ assertTrue("The trigger did not fire at all", await);
+ triggerFiredLatch = new CountDownLatch(1);
+ triggerFired.compareAndSet(true, false);
+ // add another node
+ JettySolrRunner newNode4 = cluster.startJettySolrRunner();
+ await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
+ assertTrue("The trigger did not fire at all", await);
+ // wait for listener to capture the SUCCEEDED stage
+ Thread.sleep(2000);
+
+ // there must be two SUCCEEDED (due to newNode3 and newNode4) and maybe some ignored events
+ capturedEvents = listenerEvents.get("bar");
+ assertTrue(capturedEvents.toString(), capturedEvents.size() >= 2);
+ // first event should be SUCCEEDED
+ ev = capturedEvents.get(0);
+ assertEquals(ev.toString(), TriggerEventProcessorStage.SUCCEEDED, ev.stage);
+
+ ev = capturedEvents.get(capturedEvents.size() - 1);
+ assertEquals(ev.toString(), TriggerEventProcessorStage.SUCCEEDED, ev.stage);
+ // the difference between timestamps of the first SUCCEEDED and the last SUCCEEDED
+ // must be larger than the modified cooldown period
+ assertTrue("timestamp delta is less than default cooldown period", ev.timestamp - prevTimestamp > TimeUnit.SECONDS.toNanos(modifiedCooldownPeriodSeconds));
+ }
+
+ public static class TestTriggerAction extends TriggerActionBase {
+
+ public TestTriggerAction() {
+ actionConstructorCalled.countDown();
+ }
+
+ @Override
+ public void process(TriggerEvent event, ActionContext actionContext) {
+ try {
+ if (triggerFired.compareAndSet(false, true)) {
+ events.add(event);
+ long currentTimeNanos = timeSource.getTimeNs();
+ long eventTimeNanos = event.getEventTime();
+ long waitForNanos = TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
+ if (currentTimeNanos - eventTimeNanos <= waitForNanos) {
+ fail(event.source + " was fired before the configured waitFor period");
+ }
+ triggerFiredLatch.countDown();
+ } else {
+ fail(event.source + " was fired more than once!");
+ }
+ } catch (Throwable t) {
+ log.debug("--throwable", t);
+ throw t;
+ }
+ }
+
+ @Override
+ public void init(Map<String, String> args) {
+ log.info("TestTriggerAction init");
+ actionInitCalled.countDown();
+ super.init(args);
+ }
+ }
+
+ public static class TestTriggerListener extends TriggerListenerBase {
+ @Override
+ public void init(SolrCloudManager cloudManager, AutoScalingConfig.TriggerListenerConfig config) {
+ super.init(cloudManager, config);
+ listenerCreated.countDown();
+ }
+
+ @Override
+ public synchronized void onEvent(TriggerEvent event, TriggerEventProcessorStage stage, String actionName,
+ ActionContext context, Throwable error, String message) {
+ List<CapturedEvent> lst = listenerEvents.computeIfAbsent(config.name, s -> new ArrayList<>());
+ lst.add(new CapturedEvent(timeSource.getTimeNs(), context, config, stage, actionName, event, message));
+ }
+ }
+}