You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by sh...@apache.org on 2017/05/11 07:18:36 UTC
lucene-solr:feature/autoscaling: SOLR-10643: Throttling strategy for
triggers and policy executions
Repository: lucene-solr
Updated Branches:
refs/heads/feature/autoscaling 6a3f22ffd -> 269fdf461
SOLR-10643: Throttling strategy for triggers and policy executions
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/269fdf46
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/269fdf46
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/269fdf46
Branch: refs/heads/feature/autoscaling
Commit: 269fdf46136ba24e8777761945b0a39718366dd2
Parents: 6a3f22f
Author: Shalin Shekhar Mangar <sh...@apache.org>
Authored: Thu May 11 12:48:40 2017 +0530
Committer: Shalin Shekhar Mangar <sh...@apache.org>
Committed: Thu May 11 12:48:40 2017 +0530
----------------------------------------------------------------------
solr/CHANGES.txt | 2 +
.../solr/cloud/autoscaling/AutoScaling.java | 17 ++-
.../cloud/autoscaling/NodeAddedTrigger.java | 8 +-
.../solr/cloud/autoscaling/NodeLostTrigger.java | 7 +-
.../cloud/autoscaling/ScheduledTriggers.java | 72 +++++++---
.../cloud/autoscaling/NodeAddedTriggerTest.java | 48 ++++++-
.../cloud/autoscaling/NodeLostTriggerTest.java | 60 +++++++-
.../autoscaling/TriggerIntegrationTest.java | 143 ++++++++++++++++++-
8 files changed, 325 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/269fdf46/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 22aa372..1c2fc1c 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -129,6 +129,8 @@ Other Changes
Add support for selecting specific properties from any compound metric using 'property' parameter to
/admin/metrics handler. (ab)
+* SOLR-10643: Throttling strategy for triggers and policy executions. (shalin)
+
----------------------
================== 6.6.0 ==================
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/269fdf46/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java
index 688aac5..cd08ea9 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java
@@ -58,13 +58,19 @@ public class AutoScaling {
}
public static interface TriggerListener<E extends TriggerEvent<? extends Trigger>> {
- public void triggerFired(E event);
+ /**
+ * This method is executed when a trigger is ready to fire.
+ *
+ * @param event a subclass of {@link TriggerEvent}
+ * @return true if the listener was ready to perform actions on the event, false otherwise.
+ */
+ public boolean triggerFired(E event);
}
public static class HttpCallbackListener implements TriggerListener {
@Override
- public void triggerFired(TriggerEvent event) {
-
+ public boolean triggerFired(TriggerEvent event) {
+ return true;
}
}
@@ -80,6 +86,11 @@ public class AutoScaling {
* is encouraged that implementations be immutable with the exception of the associated listener
* which can be get/set by a different thread than the one executing the trigger. Therefore, implementations
* should use appropriate synchronization around the listener.
+ * <p>
+ * When a trigger is ready to fire, it calls the {@link TriggerListener#triggerFired(TriggerEvent)} event
+ * with the proper trigger event object. If that method returns false then it should be interpreted to mean
+ * that Solr is not ready to process this trigger event and therefore we should retain the state and fire
+ * at the next invocation of the run() method.
*
* @param <E> the {@link TriggerEvent} which is handled by this Trigger
*/
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/269fdf46/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java
index 00281c7..e44e7ef 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java
@@ -202,9 +202,13 @@ public class NodeAddedTrigger implements AutoScaling.Trigger<NodeAddedTrigger.No
AutoScaling.TriggerListener<NodeAddedEvent> listener = listenerRef.get();
if (listener != null) {
log.info("NodeAddedTrigger {} firing registered listener for node: {} added at {} nanotime, now: {} nanotime", name, nodeName, timeAdded, now);
- listener.triggerFired(new NodeAddedEvent(this, timeAdded, nodeName));
+ if (listener.triggerFired(new NodeAddedEvent(this, timeAdded, nodeName))) {
+ // remove from tracking set only if the fire was accepted
+ trackingKeySet.remove(nodeName);
+ }
+ } else {
+ trackingKeySet.remove(nodeName);
}
- trackingKeySet.remove(nodeName);
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/269fdf46/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
index df1ea76..e52fa40 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
@@ -197,9 +197,12 @@ public class NodeLostTrigger implements AutoScaling.Trigger<NodeLostTrigger.Node
AutoScaling.TriggerListener<NodeLostEvent> listener = listenerRef.get();
if (listener != null) {
log.info("NodeLostTrigger firing registered listener");
- listener.triggerFired(new NodeLostEvent(this, timeRemoved, nodeName));
+ if (listener.triggerFired(new NodeLostEvent(this, timeRemoved, nodeName))) {
+ trackingKeySet.remove(nodeName);
+ }
+ } else {
+ trackingKeySet.remove(nodeName);
}
- trackingKeySet.remove(nodeName);
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/269fdf46/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
index 1fbfff7..05b06ce 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
@@ -31,8 +31,10 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.lucene.store.AlreadyClosedException;
+import org.apache.solr.cloud.ActionThrottle;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.IOUtils;
import org.apache.solr.util.DefaultSolrThreadFactory;
@@ -45,7 +47,8 @@ import org.slf4j.LoggerFactory;
*/
public class ScheduledTriggers implements Closeable {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- static final int SCHEDULED_TRIGGER_DELAY_SECONDS = 1;
+ static final int DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS = 1;
+ static final int DEFAULT_MIN_MS_BETWEEN_ACTIONS = 5000;
private final Map<String, ScheduledTrigger> scheduledTriggers = new HashMap<>();
@@ -55,12 +58,18 @@ public class ScheduledTriggers implements Closeable {
private final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;
/**
- * Single threaded executor to run the actions upon a trigger event
+ * Single threaded executor to run the actions upon a trigger event. We rely on this being a single
+ * threaded executor to ensure that trigger fires do not step on each other as well as to ensure
+ * that we do not run scheduled trigger threads while an action has been submitted to this executor
*/
private final ExecutorService actionExecutor;
private boolean isClosed = false;
+ private final AtomicBoolean hasPendingActions = new AtomicBoolean(false);
+
+ private final ActionThrottle actionThrottle;
+
public ScheduledTriggers() {
// todo make the core pool size configurable
// it is important to use more than one because a time taking trigger can starve other scheduled triggers
@@ -72,6 +81,8 @@ public class ScheduledTriggers implements Closeable {
scheduledThreadPoolExecutor.setRemoveOnCancelPolicy(true);
scheduledThreadPoolExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
actionExecutor = ExecutorUtil.newMDCAwareSingleThreadExecutor(new DefaultSolrThreadFactory("AutoscalingActionExecutor"));
+ // todo make the wait time configurable
+ actionThrottle = new ActionThrottle("action", DEFAULT_MIN_MS_BETWEEN_ACTIONS);
}
/**
@@ -101,23 +112,36 @@ public class ScheduledTriggers implements Closeable {
AutoScaling.Trigger source = event.getSource();
if (source.isClosed()) {
log.warn("Ignoring autoscaling event because the source trigger: " + source + " has already been closed");
- return;
+ // we do not want to lose this event just because the trigger were closed, perhaps a replacement will need it
+ return false;
}
- List<TriggerAction> actions = source.getActions();
- if (actions != null) {
- actionExecutor.submit(() -> {
- for (TriggerAction action : actions) {
- try {
- action.process(event);
- } catch (Exception e) {
- log.error("Error executing action: " + action.getName() + " for trigger event: " + event, e);
- throw e;
+ if (hasPendingActions.compareAndSet(false, true)) {
+ List<TriggerAction> actions = source.getActions();
+ if (actions != null) {
+ actionExecutor.submit(() -> {
+ assert hasPendingActions.get();
+ // let the action executor thread wait instead of the trigger thread so we use the throttle here
+ actionThrottle.minimumWaitBetweenActions();
+ actionThrottle.markAttemptingAction();
+ for (TriggerAction action : actions) {
+ try {
+ action.process(event);
+ } catch (Exception e) {
+ log.error("Error executing action: " + action.getName() + " for trigger event: " + event, e);
+ throw e;
+ } finally {
+ hasPendingActions.set(false);
+ }
}
- }
- });
+ });
+ }
+ return true;
+ } else {
+ // there is an action in the queue and we don't want to enqueue another until it is complete
+ return false;
}
});
- scheduledTrigger.scheduledFuture = scheduledThreadPoolExecutor.scheduleWithFixedDelay(newTrigger, 0, SCHEDULED_TRIGGER_DELAY_SECONDS, TimeUnit.SECONDS);
+ scheduledTrigger.scheduledFuture = scheduledThreadPoolExecutor.scheduleWithFixedDelay(scheduledTrigger, 0, DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS, TimeUnit.SECONDS);
}
/**
@@ -155,7 +179,7 @@ public class ScheduledTriggers implements Closeable {
ExecutorUtil.shutdownAndAwaitTermination(actionExecutor);
}
- private static class ScheduledTrigger implements Closeable {
+ private class ScheduledTrigger implements Runnable, Closeable {
AutoScaling.Trigger trigger;
ScheduledFuture<?> scheduledFuture;
@@ -164,6 +188,22 @@ public class ScheduledTriggers implements Closeable {
}
@Override
+ public void run() {
+ // fire a trigger only if an action is not pending
+ // note this is not fool proof e.g. it does not prevent an action being executed while a trigger
+ // is still executing. There is additional protection against that scenario in the event listener.
+ if (!hasPendingActions.get()) {
+ try {
+ trigger.run();
+ } catch (Exception e) {
+ // log but do not propagate exception because an exception thrown from a scheduled operation
+ // will suppress future executions
+ log.error("Unexpected execution from trigger: " + trigger.getName(), e);
+ }
+ }
+ }
+
+ @Override
public void close() throws IOException {
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/269fdf46/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeAddedTriggerTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeAddedTriggerTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeAddedTriggerTest.java
index d08f839..b0405cf 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeAddedTriggerTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeAddedTriggerTest.java
@@ -23,6 +23,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
@@ -35,6 +36,12 @@ import org.junit.Test;
* Test for {@link NodeAddedTrigger}
*/
public class NodeAddedTriggerTest extends SolrCloudTestCase {
+
+ private AutoScaling.TriggerListener<NodeAddedTrigger.NodeAddedEvent> noFirstRunListener = event -> {
+ fail("Did not expect the listener to fire on first run!");
+ return true;
+ };
+
@BeforeClass
public static void setupCluster() throws Exception {
configureCluster(1)
@@ -49,7 +56,7 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase {
Map<String, Object> props = createTriggerProps(waitForSeconds);
try (NodeAddedTrigger trigger = new NodeAddedTrigger("node_added_trigger", props, container)) {
- trigger.setListener(event -> fail("Did not expect the listener to fire on first run!"));
+ trigger.setListener(noFirstRunListener);
trigger.run();
JettySolrRunner newNode = cluster.startJettySolrRunner();
@@ -64,6 +71,7 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase {
} else {
fail("NodeAddedTrigger was fired more than once!");
}
+ return true;
});
int counter = 0;
do {
@@ -84,7 +92,7 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase {
try (NodeAddedTrigger trigger = new NodeAddedTrigger("node_added_trigger", props, container)) {
final long waitTime = 2;
props.put("waitFor", waitTime);
- trigger.setListener(event -> fail("Did not expect the listener to fire on first run!"));
+ trigger.setListener(noFirstRunListener);
trigger.run();
JettySolrRunner newNode = cluster.startJettySolrRunner();
@@ -97,6 +105,7 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase {
} else {
fail("NodeAddedTrigger was fired more than once!");
}
+ return true;
});
trigger.run(); // first run should detect the new node
newNode.stop(); // stop the new jetty
@@ -115,6 +124,38 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase {
}
@Test
+ public void testListenerAcceptance() throws Exception {
+ CoreContainer container = cluster.getJettySolrRunners().get(0).getCoreContainer();
+ Map<String, Object> props = createTriggerProps(0);
+ try (NodeAddedTrigger trigger = new NodeAddedTrigger("node_added_trigger", props, container)) {
+ trigger.setListener(noFirstRunListener);
+ trigger.run(); // starts tracking live nodes
+
+ JettySolrRunner newNode = cluster.startJettySolrRunner();
+ AtomicInteger callCount = new AtomicInteger(0);
+ AtomicBoolean fired = new AtomicBoolean(false);
+
+ trigger.setListener(event -> {
+ if (callCount.incrementAndGet() < 2) {
+ return false;
+ } else {
+ fired.compareAndSet(false, true);
+ return true;
+ }
+ });
+
+ trigger.run(); // first run should detect the new node and fire immediately but listener isn't ready
+ assertEquals(1, callCount.get());
+ assertFalse(fired.get());
+ trigger.run(); // second run should again fire
+ assertEquals(2, callCount.get());
+ assertTrue(fired.get());
+ trigger.run(); // should not fire
+ assertEquals(2, callCount.get());
+ }
+ }
+
+ @Test
public void testRestoreState() throws Exception {
CoreContainer container = cluster.getJettySolrRunners().get(0).getCoreContainer();
long waitForSeconds = 1 + random().nextInt(5);
@@ -125,7 +166,7 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase {
NodeAddedTrigger trigger = new NodeAddedTrigger("node_added_trigger", props, container);
final long waitTime = 2;
props.put("waitFor", waitTime);
- trigger.setListener(event -> fail("Did not expect the listener to fire on first run!"));
+ trigger.setListener(noFirstRunListener);
trigger.run();
JettySolrRunner newNode = cluster.startJettySolrRunner();
@@ -153,6 +194,7 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase {
} else {
fail("NodeAddedTrigger was fired more than once!");
}
+ return true;
});
newTrigger.restoreState(trigger); // restore state from the old trigger
int counter = 0;
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/269fdf46/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeLostTriggerTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeLostTriggerTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeLostTriggerTest.java
index 53a4458..efa63d3 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeLostTriggerTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeLostTriggerTest.java
@@ -23,6 +23,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
@@ -36,6 +37,11 @@ import org.junit.Test;
*/
public class NodeLostTriggerTest extends SolrCloudTestCase {
+ private AutoScaling.TriggerListener<NodeLostTrigger.NodeLostEvent> noFirstRunListener = event -> {
+ fail("Did not expect the listener to fire on first run!");
+ return true;
+ };
+
@BeforeClass
public static void setupCluster() throws Exception {
configureCluster(5)
@@ -50,7 +56,7 @@ public class NodeLostTriggerTest extends SolrCloudTestCase {
Map<String, Object> props = createTriggerProps(waitForSeconds);
try (NodeLostTrigger trigger = new NodeLostTrigger("node_lost_trigger", props, container)) {
- trigger.setListener(event -> fail("Did not expect the listener to fire on first run!"));
+ trigger.setListener(noFirstRunListener);
trigger.run();
String lostNodeName = cluster.getJettySolrRunner(1).getNodeName();
cluster.stopJettySolrRunner(1);
@@ -66,6 +72,7 @@ public class NodeLostTriggerTest extends SolrCloudTestCase {
} else {
fail("NodeLostListener was fired more than once!");
}
+ return true;
});
int counter = 0;
do {
@@ -87,7 +94,7 @@ public class NodeLostTriggerTest extends SolrCloudTestCase {
try (NodeLostTrigger trigger = new NodeLostTrigger("node_lost_trigger", props, container)) {
final long waitTime = 2;
props.put("waitFor", waitTime);
- trigger.setListener(event -> fail("Did not expect the listener to fire on first run!"));
+ trigger.setListener(noFirstRunListener);
trigger.run();
JettySolrRunner lostNode = cluster.getJettySolrRunner(1);
@@ -101,6 +108,7 @@ public class NodeLostTriggerTest extends SolrCloudTestCase {
} else {
fail("NodeLostListener was fired more than once!");
}
+ return true;
});
trigger.run(); // first run should detect the lost node
int counter = 0;
@@ -129,6 +137,51 @@ public class NodeLostTriggerTest extends SolrCloudTestCase {
}
@Test
+ public void testListenerAcceptance() throws Exception {
+ CoreContainer container = cluster.getJettySolrRunners().get(0).getCoreContainer();
+ Map<String, Object> props = createTriggerProps(0);
+ try (NodeLostTrigger trigger = new NodeLostTrigger("node_added_trigger", props, container)) {
+ trigger.setListener(noFirstRunListener);
+
+ JettySolrRunner newNode = cluster.startJettySolrRunner();
+ cluster.waitForAllNodes(5);
+
+ trigger.run(); // starts tracking live nodes
+
+ // stop the newly created node
+ List<JettySolrRunner> jettySolrRunners = cluster.getJettySolrRunners();
+ for (int i = 0; i < jettySolrRunners.size(); i++) {
+ JettySolrRunner jettySolrRunner = jettySolrRunners.get(i);
+ if (newNode == jettySolrRunner) {
+ cluster.stopJettySolrRunner(i);
+ break;
+ }
+ }
+
+ AtomicInteger callCount = new AtomicInteger(0);
+ AtomicBoolean fired = new AtomicBoolean(false);
+
+ trigger.setListener(event -> {
+ if (callCount.incrementAndGet() < 2) {
+ return false;
+ } else {
+ fired.compareAndSet(false, true);
+ return true;
+ }
+ });
+
+ trigger.run(); // first run should detect the lost node and fire immediately but listener isn't ready
+ assertEquals(1, callCount.get());
+ assertFalse(fired.get());
+ trigger.run(); // second run should again fire
+ assertEquals(2, callCount.get());
+ assertTrue(fired.get());
+ trigger.run(); // should not fire
+ assertEquals(2, callCount.get());
+ }
+ }
+
+ @Test
public void testRestoreState() throws Exception {
CoreContainer container = cluster.getJettySolrRunners().get(0).getCoreContainer();
long waitForSeconds = 1 + random().nextInt(5);
@@ -141,7 +194,7 @@ public class NodeLostTriggerTest extends SolrCloudTestCase {
// and assert that the new trigger still fires
NodeLostTrigger trigger = new NodeLostTrigger("node_lost_trigger", props, container);
- trigger.setListener(event -> fail("Did not expect the listener to fire on first run!"));
+ trigger.setListener(noFirstRunListener);
trigger.run();
newNode.stop();
trigger.run(); // this run should detect the lost node
@@ -168,6 +221,7 @@ public class NodeLostTriggerTest extends SolrCloudTestCase {
} else {
fail("NodeLostListener was fired more than once!");
}
+ return true;
});
newTrigger.restoreState(trigger); // restore state from the old trigger
int counter = 0;
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/269fdf46/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
index a5ff24e..545c0d6 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
@@ -24,7 +24,9 @@ import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
@@ -45,7 +47,7 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.solr.cloud.autoscaling.ScheduledTriggers.SCHEDULED_TRIGGER_DELAY_SECONDS;
+import static org.apache.solr.cloud.autoscaling.ScheduledTriggers.DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS;
import static org.apache.solr.common.cloud.ZkStateReader.SOLR_AUTOSCALING_CONF_PATH;
/**
@@ -86,6 +88,141 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
}
@Test
+ public void testTriggerThrottling() throws Exception {
+ // for this test we want to create two triggers so we must assert that the actions were created twice
+ TriggerIntegrationTest.actionCreated = new CountDownLatch(2);
+ // similarly we want both triggers to fire
+ triggerFiredLatch = new CountDownLatch(2);
+
+ CloudSolrClient solrClient = cluster.getSolrClient();
+
+ // first trigger
+ String setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'node_added_trigger1'," +
+ "'event' : 'nodeAdded'," +
+ "'waitFor' : '0s'," +
+ "'enabled' : true," +
+ "'actions' : [{'name':'test','class':'" + ThrottingTesterAction.class.getName() + "'}]" +
+ "}}";
+ SolrRequest req = new AutoScalingHandlerTest.AutoScalingRequest(SolrRequest.METHOD.POST, path, setTriggerCommand);
+ NamedList<Object> response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ // second trigger
+ setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'node_added_trigger2'," +
+ "'event' : 'nodeAdded'," +
+ "'waitFor' : '0s'," +
+ "'enabled' : true," +
+ "'actions' : [{'name':'test','class':'" + ThrottingTesterAction.class.getName() + "'}]" +
+ "}}";
+ req = new AutoScalingHandlerTest.AutoScalingRequest(SolrRequest.METHOD.POST, path, setTriggerCommand);
+ response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ // wait until the two instances of action are created
+ if (!actionCreated.await(3, TimeUnit.SECONDS)) {
+ fail("Two TriggerAction instances should have been created by now");
+ }
+
+ JettySolrRunner newNode = cluster.startJettySolrRunner();
+
+ if (!triggerFiredLatch.await(10, TimeUnit.SECONDS)) {
+ fail("Both triggers should have fired by now");
+ }
+
+ // reset shared state
+ lastActionExecutedAt.set(0);
+ TriggerIntegrationTest.actionCreated = new CountDownLatch(2);
+ triggerFiredLatch = new CountDownLatch(2);
+
+ setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'node_lost_trigger1'," +
+ "'event' : 'nodeLost'," +
+ "'waitFor' : '0s'," +
+ "'enabled' : true," +
+ "'actions' : [{'name':'test','class':'" + ThrottingTesterAction.class.getName() + "'}]" +
+ "}}";
+ req = new AutoScalingHandlerTest.AutoScalingRequest(SolrRequest.METHOD.POST, path, setTriggerCommand);
+ response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'node_lost_trigger2'," +
+ "'event' : 'nodeLost'," +
+ "'waitFor' : '0s'," +
+ "'enabled' : true," +
+ "'actions' : [{'name':'test','class':'" + ThrottingTesterAction.class.getName() + "'}]" +
+ "}}";
+ req = new AutoScalingHandlerTest.AutoScalingRequest(SolrRequest.METHOD.POST, path, setTriggerCommand);
+ response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ // wait until the two instances of action are created
+ if (!actionCreated.await(3, TimeUnit.SECONDS)) {
+ fail("Two TriggerAction instances should have been created by now");
+ }
+
+ // stop the node we had started earlier
+ List<JettySolrRunner> jettySolrRunners = cluster.getJettySolrRunners();
+ for (int i = 0; i < jettySolrRunners.size(); i++) {
+ JettySolrRunner jettySolrRunner = jettySolrRunners.get(i);
+ if (jettySolrRunner == newNode) {
+ cluster.stopJettySolrRunner(i);
+ break;
+ }
+ }
+
+ if (!triggerFiredLatch.await(10, TimeUnit.SECONDS)) {
+ fail("Both triggers should have fired by now");
+ }
+ }
+
+ static AtomicLong lastActionExecutedAt = new AtomicLong(0);
+ static ReentrantLock lock = new ReentrantLock();
+ public static class ThrottingTesterAction extends TestTriggerAction {
+ // nanos are very precise so we need a delta for comparison with ms
+ private static final long DELTA_MS = 2;
+
+ // sanity check that an action instance is only invoked once
+ private final AtomicBoolean onlyOnce = new AtomicBoolean(false);
+
+ @Override
+ public void process(AutoScaling.TriggerEvent event) {
+ boolean locked = lock.tryLock();
+ if (!locked) {
+ log.info("We should never have a tryLock fail because actions are never supposed to be executed concurrently");
+ return;
+ }
+ try {
+ if (lastActionExecutedAt.get() != 0) {
+ log.info("last action at " + lastActionExecutedAt.get() + " nano time = " + System.nanoTime());
+ if (System.nanoTime() - lastActionExecutedAt.get() < TimeUnit.NANOSECONDS.convert(ScheduledTriggers.DEFAULT_MIN_MS_BETWEEN_ACTIONS - DELTA_MS, TimeUnit.MILLISECONDS)) {
+ log.info("action executed again before minimum wait time from {}", event.getSource().getName());
+ fail("TriggerListener was fired before the throttling period");
+ }
+ }
+ if (onlyOnce.compareAndSet(false, true)) {
+ log.info("action executed from {}", event.getSource().getName());
+ lastActionExecutedAt.set(System.nanoTime());
+ triggerFiredLatch.countDown();
+ } else {
+ log.info("action executed more than once from {}", event.getSource().getName());
+ fail("Trigger should not have fired more than once!");
+ }
+ } finally {
+ if (locked) {
+ lock.unlock();
+ }
+ }
+ }
+ }
+
+ @Test
public void testNodeLostTriggerRestoreState() throws Exception {
// for this test we want to update the trigger so we must assert that the actions were created twice
TriggerIntegrationTest.actionCreated = new CountDownLatch(2);
@@ -124,7 +261,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
cluster.stopJettySolrRunner(index);
// ensure that the old trigger sees the stopped node, todo find a better way to do this
- Thread.sleep(500 + TimeUnit.MILLISECONDS.convert(SCHEDULED_TRIGGER_DELAY_SECONDS, TimeUnit.SECONDS));
+ Thread.sleep(500 + TimeUnit.MILLISECONDS.convert(DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS, TimeUnit.SECONDS));
waitForSeconds = 0;
setTriggerCommand = "{" +
@@ -182,7 +319,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
JettySolrRunner newNode = cluster.startJettySolrRunner();
// ensure that the old trigger sees the new node, todo find a better way to do this
- Thread.sleep(500 + TimeUnit.MILLISECONDS.convert(SCHEDULED_TRIGGER_DELAY_SECONDS, TimeUnit.SECONDS));
+ Thread.sleep(500 + TimeUnit.MILLISECONDS.convert(DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS, TimeUnit.SECONDS));
waitForSeconds = 0;
setTriggerCommand = "{" +