You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by sh...@apache.org on 2017/05/11 07:18:36 UTC

lucene-solr:feature/autoscaling: SOLR-10643: Throttling strategy for triggers and policy executions

Repository: lucene-solr
Updated Branches:
  refs/heads/feature/autoscaling 6a3f22ffd -> 269fdf461


SOLR-10643: Throttling strategy for triggers and policy executions


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/269fdf46
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/269fdf46
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/269fdf46

Branch: refs/heads/feature/autoscaling
Commit: 269fdf46136ba24e8777761945b0a39718366dd2
Parents: 6a3f22f
Author: Shalin Shekhar Mangar <sh...@apache.org>
Authored: Thu May 11 12:48:40 2017 +0530
Committer: Shalin Shekhar Mangar <sh...@apache.org>
Committed: Thu May 11 12:48:40 2017 +0530

----------------------------------------------------------------------
 solr/CHANGES.txt                                |   2 +
 .../solr/cloud/autoscaling/AutoScaling.java     |  17 ++-
 .../cloud/autoscaling/NodeAddedTrigger.java     |   8 +-
 .../solr/cloud/autoscaling/NodeLostTrigger.java |   7 +-
 .../cloud/autoscaling/ScheduledTriggers.java    |  72 +++++++---
 .../cloud/autoscaling/NodeAddedTriggerTest.java |  48 ++++++-
 .../cloud/autoscaling/NodeLostTriggerTest.java  |  60 +++++++-
 .../autoscaling/TriggerIntegrationTest.java     | 143 ++++++++++++++++++-
 8 files changed, 325 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/269fdf46/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 22aa372..1c2fc1c 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -129,6 +129,8 @@ Other Changes
   Add support for selecting specific properties from any compound metric using 'property' parameter to
   /admin/metrics handler. (ab)
 
+* SOLR-10643: Throttling strategy for triggers and policy executions. (shalin)
+
 ----------------------
 
 ==================  6.6.0 ==================

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/269fdf46/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java
index 688aac5..cd08ea9 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java
@@ -58,13 +58,19 @@ public class AutoScaling {
   }
 
   public static interface TriggerListener<E extends TriggerEvent<? extends Trigger>> {
-    public void triggerFired(E event);
+    /**
+     * This method is executed when a trigger is ready to fire.
+     *
+     * @param event a subclass of {@link TriggerEvent}
+     * @return true if the listener was ready to perform actions on the event, false otherwise.
+     */
+    public boolean triggerFired(E event);
   }
 
   public static class HttpCallbackListener implements TriggerListener {
     @Override
-    public void triggerFired(TriggerEvent event) {
-
+    public boolean triggerFired(TriggerEvent event) {
+      return true;
     }
   }
 
@@ -80,6 +86,11 @@ public class AutoScaling {
    * is encouraged that implementations be immutable with the exception of the associated listener
    * which can be get/set by a different thread than the one executing the trigger. Therefore, implementations
    * should use appropriate synchronization around the listener.
+   * <p>
+   * When a trigger is ready to fire, it calls the {@link TriggerListener#triggerFired(TriggerEvent)} event
+   * with the proper trigger event object. If that method returns false then it should be interpreted to mean
+   * that Solr is not ready to process this trigger event and therefore we should retain the state and fire
+   * at the next invocation of the run() method.
    *
    * @param <E> the {@link TriggerEvent} which is handled by this Trigger
    */

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/269fdf46/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java
index 00281c7..e44e7ef 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java
@@ -202,9 +202,13 @@ public class NodeAddedTrigger implements AutoScaling.Trigger<NodeAddedTrigger.No
           AutoScaling.TriggerListener<NodeAddedEvent> listener = listenerRef.get();
           if (listener != null) {
             log.info("NodeAddedTrigger {} firing registered listener for node: {} added at {} nanotime, now: {} nanotime", name, nodeName, timeAdded, now);
-            listener.triggerFired(new NodeAddedEvent(this, timeAdded, nodeName));
+            if (listener.triggerFired(new NodeAddedEvent(this, timeAdded, nodeName))) {
+              // remove from tracking set only if the fire was accepted
+              trackingKeySet.remove(nodeName);
+            }
+          } else  {
+            trackingKeySet.remove(nodeName);
           }
-          trackingKeySet.remove(nodeName);
         }
       }
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/269fdf46/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
index df1ea76..e52fa40 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
@@ -197,9 +197,12 @@ public class NodeLostTrigger implements AutoScaling.Trigger<NodeLostTrigger.Node
           AutoScaling.TriggerListener<NodeLostEvent> listener = listenerRef.get();
           if (listener != null) {
             log.info("NodeLostTrigger firing registered listener");
-            listener.triggerFired(new NodeLostEvent(this, timeRemoved, nodeName));
+            if (listener.triggerFired(new NodeLostEvent(this, timeRemoved, nodeName)))  {
+              trackingKeySet.remove(nodeName);
+            }
+          } else  {
+            trackingKeySet.remove(nodeName);
           }
-          trackingKeySet.remove(nodeName);
         }
       }
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/269fdf46/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
index 1fbfff7..05b06ce 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
@@ -31,8 +31,10 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.lucene.store.AlreadyClosedException;
+import org.apache.solr.cloud.ActionThrottle;
 import org.apache.solr.common.util.ExecutorUtil;
 import org.apache.solr.common.util.IOUtils;
 import org.apache.solr.util.DefaultSolrThreadFactory;
@@ -45,7 +47,8 @@ import org.slf4j.LoggerFactory;
  */
 public class ScheduledTriggers implements Closeable {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-  static final int SCHEDULED_TRIGGER_DELAY_SECONDS = 1;
+  static final int DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS = 1;
+  static final int DEFAULT_MIN_MS_BETWEEN_ACTIONS = 5000;
 
   private final Map<String, ScheduledTrigger> scheduledTriggers = new HashMap<>();
 
@@ -55,12 +58,18 @@ public class ScheduledTriggers implements Closeable {
   private final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;
 
   /**
-   * Single threaded executor to run the actions upon a trigger event
+   * Single threaded executor to run the actions upon a trigger event. We rely on this being a single
+   * threaded executor to ensure that trigger fires do not step on each other as well as to ensure
+   * that we do not run scheduled trigger threads while an action has been submitted to this executor
    */
   private final ExecutorService actionExecutor;
 
   private boolean isClosed = false;
 
+  private final AtomicBoolean hasPendingActions = new AtomicBoolean(false);
+
+  private final ActionThrottle actionThrottle;
+
   public ScheduledTriggers() {
     // todo make the core pool size configurable
     // it is important to use more than one because a time taking trigger can starve other scheduled triggers
@@ -72,6 +81,8 @@ public class ScheduledTriggers implements Closeable {
     scheduledThreadPoolExecutor.setRemoveOnCancelPolicy(true);
     scheduledThreadPoolExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
     actionExecutor = ExecutorUtil.newMDCAwareSingleThreadExecutor(new DefaultSolrThreadFactory("AutoscalingActionExecutor"));
+    // todo make the wait time configurable
+    actionThrottle = new ActionThrottle("action", DEFAULT_MIN_MS_BETWEEN_ACTIONS);
   }
 
   /**
@@ -101,23 +112,36 @@ public class ScheduledTriggers implements Closeable {
       AutoScaling.Trigger source = event.getSource();
       if (source.isClosed()) {
         log.warn("Ignoring autoscaling event because the source trigger: " + source + " has already been closed");
-        return;
+        // we do not want to lose this event just because the trigger were closed, perhaps a replacement will need it
+        return false;
       }
-      List<TriggerAction> actions = source.getActions();
-      if (actions != null) {
-        actionExecutor.submit(() -> {
-          for (TriggerAction action : actions) {
-            try {
-              action.process(event);
-            } catch (Exception e) {
-              log.error("Error executing action: " + action.getName() + " for trigger event: " + event, e);
-              throw e;
+      if (hasPendingActions.compareAndSet(false, true)) {
+        List<TriggerAction> actions = source.getActions();
+        if (actions != null) {
+          actionExecutor.submit(() -> {
+            assert hasPendingActions.get();
+            // let the action executor thread wait instead of the trigger thread so we use the throttle here
+            actionThrottle.minimumWaitBetweenActions();
+            actionThrottle.markAttemptingAction();
+            for (TriggerAction action : actions) {
+              try {
+                action.process(event);
+              } catch (Exception e) {
+                log.error("Error executing action: " + action.getName() + " for trigger event: " + event, e);
+                throw e;
+              } finally {
+                hasPendingActions.set(false);
+              }
             }
-          }
-        });
+          });
+        }
+        return true;
+      } else {
+        // there is an action in the queue and we don't want to enqueue another until it is complete
+        return false;
       }
     });
-    scheduledTrigger.scheduledFuture = scheduledThreadPoolExecutor.scheduleWithFixedDelay(newTrigger, 0, SCHEDULED_TRIGGER_DELAY_SECONDS, TimeUnit.SECONDS);
+    scheduledTrigger.scheduledFuture = scheduledThreadPoolExecutor.scheduleWithFixedDelay(scheduledTrigger, 0, DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS, TimeUnit.SECONDS);
   }
 
   /**
@@ -155,7 +179,7 @@ public class ScheduledTriggers implements Closeable {
     ExecutorUtil.shutdownAndAwaitTermination(actionExecutor);
   }
 
-  private static class ScheduledTrigger implements Closeable {
+  private class ScheduledTrigger implements Runnable, Closeable {
     AutoScaling.Trigger trigger;
     ScheduledFuture<?> scheduledFuture;
 
@@ -164,6 +188,22 @@ public class ScheduledTriggers implements Closeable {
     }
 
     @Override
+    public void run() {
+      // fire a trigger only if an action is not pending
+      // note this is not fool proof e.g. it does not prevent an action being executed while a trigger
+      // is still executing. There is additional protection against that scenario in the event listener.
+      if (!hasPendingActions.get())  {
+        try {
+          trigger.run();
+        } catch (Exception e) {
+          // log but do not propagate exception because an exception thrown from a scheduled operation
+          // will suppress future executions
+          log.error("Unexpected execution from trigger: " + trigger.getName(), e);
+        }
+      }
+    }
+
+    @Override
     public void close() throws IOException {
       if (scheduledFuture != null) {
         scheduledFuture.cancel(true);

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/269fdf46/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeAddedTriggerTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeAddedTriggerTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeAddedTriggerTest.java
index d08f839..b0405cf 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeAddedTriggerTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeAddedTriggerTest.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.solr.client.solrj.embedded.JettySolrRunner;
@@ -35,6 +36,12 @@ import org.junit.Test;
  * Test for {@link NodeAddedTrigger}
  */
 public class NodeAddedTriggerTest extends SolrCloudTestCase {
+
+  private AutoScaling.TriggerListener<NodeAddedTrigger.NodeAddedEvent> noFirstRunListener = event -> {
+    fail("Did not expect the listener to fire on first run!");
+    return true;
+  };
+
   @BeforeClass
   public static void setupCluster() throws Exception {
     configureCluster(1)
@@ -49,7 +56,7 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase {
     Map<String, Object> props = createTriggerProps(waitForSeconds);
 
     try (NodeAddedTrigger trigger = new NodeAddedTrigger("node_added_trigger", props, container)) {
-      trigger.setListener(event -> fail("Did not expect the listener to fire on first run!"));
+      trigger.setListener(noFirstRunListener);
       trigger.run();
 
       JettySolrRunner newNode = cluster.startJettySolrRunner();
@@ -64,6 +71,7 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase {
         } else {
           fail("NodeAddedTrigger was fired more than once!");
         }
+        return true;
       });
       int counter = 0;
       do {
@@ -84,7 +92,7 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase {
     try (NodeAddedTrigger trigger = new NodeAddedTrigger("node_added_trigger", props, container)) {
       final long waitTime = 2;
       props.put("waitFor", waitTime);
-      trigger.setListener(event -> fail("Did not expect the listener to fire on first run!"));
+      trigger.setListener(noFirstRunListener);
       trigger.run();
 
       JettySolrRunner newNode = cluster.startJettySolrRunner();
@@ -97,6 +105,7 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase {
         } else {
           fail("NodeAddedTrigger was fired more than once!");
         }
+        return true;
       });
       trigger.run(); // first run should detect the new node
       newNode.stop(); // stop the new jetty
@@ -115,6 +124,38 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase {
   }
 
   @Test
+  public void testListenerAcceptance() throws Exception {
+    CoreContainer container = cluster.getJettySolrRunners().get(0).getCoreContainer();
+    Map<String, Object> props = createTriggerProps(0);
+    try (NodeAddedTrigger trigger = new NodeAddedTrigger("node_added_trigger", props, container)) {
+      trigger.setListener(noFirstRunListener);
+      trigger.run(); // starts tracking live nodes
+
+      JettySolrRunner newNode = cluster.startJettySolrRunner();
+      AtomicInteger callCount = new AtomicInteger(0);
+      AtomicBoolean fired = new AtomicBoolean(false);
+
+      trigger.setListener(event -> {
+        if (callCount.incrementAndGet() < 2) {
+          return false;
+        } else  {
+          fired.compareAndSet(false, true);
+          return true;
+        }
+      });
+
+      trigger.run(); // first run should detect the new node and fire immediately but listener isn't ready
+      assertEquals(1, callCount.get());
+      assertFalse(fired.get());
+      trigger.run(); // second run should again fire
+      assertEquals(2, callCount.get());
+      assertTrue(fired.get());
+      trigger.run(); // should not fire
+      assertEquals(2, callCount.get());
+    }
+  }
+
+  @Test
   public void testRestoreState() throws Exception {
     CoreContainer container = cluster.getJettySolrRunners().get(0).getCoreContainer();
     long waitForSeconds = 1 + random().nextInt(5);
@@ -125,7 +166,7 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase {
     NodeAddedTrigger trigger = new NodeAddedTrigger("node_added_trigger", props, container);
     final long waitTime = 2;
     props.put("waitFor", waitTime);
-    trigger.setListener(event -> fail("Did not expect the listener to fire on first run!"));
+    trigger.setListener(noFirstRunListener);
     trigger.run();
 
     JettySolrRunner newNode = cluster.startJettySolrRunner();
@@ -153,6 +194,7 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase {
         } else {
           fail("NodeAddedTrigger was fired more than once!");
         }
+        return true;
       });
       newTrigger.restoreState(trigger); // restore state from the old trigger
       int counter = 0;

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/269fdf46/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeLostTriggerTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeLostTriggerTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeLostTriggerTest.java
index 53a4458..efa63d3 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeLostTriggerTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeLostTriggerTest.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.solr.client.solrj.embedded.JettySolrRunner;
@@ -36,6 +37,11 @@ import org.junit.Test;
  */
 public class NodeLostTriggerTest extends SolrCloudTestCase {
 
+  private AutoScaling.TriggerListener<NodeLostTrigger.NodeLostEvent> noFirstRunListener = event -> {
+    fail("Did not expect the listener to fire on first run!");
+    return true;
+  };
+
   @BeforeClass
   public static void setupCluster() throws Exception {
     configureCluster(5)
@@ -50,7 +56,7 @@ public class NodeLostTriggerTest extends SolrCloudTestCase {
     Map<String, Object> props = createTriggerProps(waitForSeconds);
 
     try (NodeLostTrigger trigger = new NodeLostTrigger("node_lost_trigger", props, container)) {
-      trigger.setListener(event -> fail("Did not expect the listener to fire on first run!"));
+      trigger.setListener(noFirstRunListener);
       trigger.run();
       String lostNodeName = cluster.getJettySolrRunner(1).getNodeName();
       cluster.stopJettySolrRunner(1);
@@ -66,6 +72,7 @@ public class NodeLostTriggerTest extends SolrCloudTestCase {
         } else {
           fail("NodeLostListener was fired more than once!");
         }
+        return true;
       });
       int counter = 0;
       do {
@@ -87,7 +94,7 @@ public class NodeLostTriggerTest extends SolrCloudTestCase {
     try (NodeLostTrigger trigger = new NodeLostTrigger("node_lost_trigger", props, container)) {
       final long waitTime = 2;
       props.put("waitFor", waitTime);
-      trigger.setListener(event -> fail("Did not expect the listener to fire on first run!"));
+      trigger.setListener(noFirstRunListener);
       trigger.run();
 
       JettySolrRunner lostNode = cluster.getJettySolrRunner(1);
@@ -101,6 +108,7 @@ public class NodeLostTriggerTest extends SolrCloudTestCase {
         } else {
           fail("NodeLostListener was fired more than once!");
         }
+        return true;
       });
       trigger.run(); // first run should detect the lost node
       int counter = 0;
@@ -129,6 +137,51 @@ public class NodeLostTriggerTest extends SolrCloudTestCase {
   }
 
   @Test
+  public void testListenerAcceptance() throws Exception {
+    CoreContainer container = cluster.getJettySolrRunners().get(0).getCoreContainer();
+    Map<String, Object> props = createTriggerProps(0);
+    try (NodeLostTrigger trigger = new NodeLostTrigger("node_added_trigger", props, container)) {
+      trigger.setListener(noFirstRunListener);
+
+      JettySolrRunner newNode = cluster.startJettySolrRunner();
+      cluster.waitForAllNodes(5);
+
+      trigger.run(); // starts tracking live nodes
+
+      // stop the newly created node
+      List<JettySolrRunner> jettySolrRunners = cluster.getJettySolrRunners();
+      for (int i = 0; i < jettySolrRunners.size(); i++) {
+        JettySolrRunner jettySolrRunner = jettySolrRunners.get(i);
+        if (newNode == jettySolrRunner) {
+          cluster.stopJettySolrRunner(i);
+          break;
+        }
+      }
+
+      AtomicInteger callCount = new AtomicInteger(0);
+      AtomicBoolean fired = new AtomicBoolean(false);
+
+      trigger.setListener(event -> {
+        if (callCount.incrementAndGet() < 2) {
+          return false;
+        } else  {
+          fired.compareAndSet(false, true);
+          return true;
+        }
+      });
+
+      trigger.run(); // first run should detect the lost node and fire immediately but listener isn't ready
+      assertEquals(1, callCount.get());
+      assertFalse(fired.get());
+      trigger.run(); // second run should again fire
+      assertEquals(2, callCount.get());
+      assertTrue(fired.get());
+      trigger.run(); // should not fire
+      assertEquals(2, callCount.get());
+    }
+  }
+
+  @Test
   public void testRestoreState() throws Exception {
     CoreContainer container = cluster.getJettySolrRunners().get(0).getCoreContainer();
     long waitForSeconds = 1 + random().nextInt(5);
@@ -141,7 +194,7 @@ public class NodeLostTriggerTest extends SolrCloudTestCase {
     // and assert that the new trigger still fires
 
     NodeLostTrigger trigger = new NodeLostTrigger("node_lost_trigger", props, container);
-    trigger.setListener(event -> fail("Did not expect the listener to fire on first run!"));
+    trigger.setListener(noFirstRunListener);
     trigger.run();
     newNode.stop();
     trigger.run(); // this run should detect the lost node
@@ -168,6 +221,7 @@ public class NodeLostTriggerTest extends SolrCloudTestCase {
         } else {
           fail("NodeLostListener was fired more than once!");
         }
+        return true;
       });
       newTrigger.restoreState(trigger); // restore state from the old trigger
       int counter = 0;

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/269fdf46/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
index a5ff24e..545c0d6 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
@@ -24,7 +24,9 @@ import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.SolrServerException;
@@ -45,7 +47,7 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.solr.cloud.autoscaling.ScheduledTriggers.SCHEDULED_TRIGGER_DELAY_SECONDS;
+import static org.apache.solr.cloud.autoscaling.ScheduledTriggers.DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS;
 import static org.apache.solr.common.cloud.ZkStateReader.SOLR_AUTOSCALING_CONF_PATH;
 
 /**
@@ -86,6 +88,141 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
   }
 
   @Test
+  public void testTriggerThrottling() throws Exception  {
+    // for this test we want to create two triggers so we must assert that the actions were created twice
+    TriggerIntegrationTest.actionCreated = new CountDownLatch(2);
+    // similarly we want both triggers to fire
+    triggerFiredLatch = new CountDownLatch(2);
+
+    CloudSolrClient solrClient = cluster.getSolrClient();
+
+    // first trigger
+    String setTriggerCommand = "{" +
+        "'set-trigger' : {" +
+        "'name' : 'node_added_trigger1'," +
+        "'event' : 'nodeAdded'," +
+        "'waitFor' : '0s'," +
+        "'enabled' : true," +
+        "'actions' : [{'name':'test','class':'" + ThrottingTesterAction.class.getName() + "'}]" +
+        "}}";
+    SolrRequest req = new AutoScalingHandlerTest.AutoScalingRequest(SolrRequest.METHOD.POST, path, setTriggerCommand);
+    NamedList<Object> response = solrClient.request(req);
+    assertEquals(response.get("result").toString(), "success");
+
+    // second trigger
+    setTriggerCommand = "{" +
+        "'set-trigger' : {" +
+        "'name' : 'node_added_trigger2'," +
+        "'event' : 'nodeAdded'," +
+        "'waitFor' : '0s'," +
+        "'enabled' : true," +
+        "'actions' : [{'name':'test','class':'" + ThrottingTesterAction.class.getName() + "'}]" +
+        "}}";
+    req = new AutoScalingHandlerTest.AutoScalingRequest(SolrRequest.METHOD.POST, path, setTriggerCommand);
+    response = solrClient.request(req);
+    assertEquals(response.get("result").toString(), "success");
+
+    // wait until the two instances of action are created
+    if (!actionCreated.await(3, TimeUnit.SECONDS))  {
+      fail("Two TriggerAction instances should have been created by now");
+    }
+
+    JettySolrRunner newNode = cluster.startJettySolrRunner();
+
+    if (!triggerFiredLatch.await(10, TimeUnit.SECONDS)) {
+      fail("Both triggers should have fired by now");
+    }
+
+    // reset shared state
+    lastActionExecutedAt.set(0);
+    TriggerIntegrationTest.actionCreated = new CountDownLatch(2);
+    triggerFiredLatch = new CountDownLatch(2);
+
+    setTriggerCommand = "{" +
+        "'set-trigger' : {" +
+        "'name' : 'node_lost_trigger1'," +
+        "'event' : 'nodeLost'," +
+        "'waitFor' : '0s'," +
+        "'enabled' : true," +
+        "'actions' : [{'name':'test','class':'" + ThrottingTesterAction.class.getName() + "'}]" +
+        "}}";
+    req = new AutoScalingHandlerTest.AutoScalingRequest(SolrRequest.METHOD.POST, path, setTriggerCommand);
+    response = solrClient.request(req);
+    assertEquals(response.get("result").toString(), "success");
+
+    setTriggerCommand = "{" +
+        "'set-trigger' : {" +
+        "'name' : 'node_lost_trigger2'," +
+        "'event' : 'nodeLost'," +
+        "'waitFor' : '0s'," +
+        "'enabled' : true," +
+        "'actions' : [{'name':'test','class':'" + ThrottingTesterAction.class.getName() + "'}]" +
+        "}}";
+    req = new AutoScalingHandlerTest.AutoScalingRequest(SolrRequest.METHOD.POST, path, setTriggerCommand);
+    response = solrClient.request(req);
+    assertEquals(response.get("result").toString(), "success");
+
+    // wait until the two instances of action are created
+    if (!actionCreated.await(3, TimeUnit.SECONDS))  {
+      fail("Two TriggerAction instances should have been created by now");
+    }
+
+    // stop the node we had started earlier
+    List<JettySolrRunner> jettySolrRunners = cluster.getJettySolrRunners();
+    for (int i = 0; i < jettySolrRunners.size(); i++) {
+      JettySolrRunner jettySolrRunner = jettySolrRunners.get(i);
+      if (jettySolrRunner == newNode) {
+        cluster.stopJettySolrRunner(i);
+        break;
+      }
+    }
+
+    if (!triggerFiredLatch.await(10, TimeUnit.SECONDS)) {
+      fail("Both triggers should have fired by now");
+    }
+  }
+
+  static AtomicLong lastActionExecutedAt = new AtomicLong(0);
+  static ReentrantLock lock = new ReentrantLock();
+  public static class ThrottingTesterAction extends TestTriggerAction {
+    // nanos are very precise so we need a delta for comparison with ms
+    private static final long DELTA_MS = 2;
+
+    // sanity check that an action instance is only invoked once
+    private final AtomicBoolean onlyOnce = new AtomicBoolean(false);
+
+    @Override
+    public void process(AutoScaling.TriggerEvent event) {
+      boolean locked = lock.tryLock();
+      if (!locked)  {
+        log.info("We should never have a tryLock fail because actions are never supposed to be executed concurrently");
+        return;
+      }
+      try {
+        if (lastActionExecutedAt.get() != 0)  {
+          log.info("last action at " + lastActionExecutedAt.get() + " nano time = " + System.nanoTime());
+          if (System.nanoTime() - lastActionExecutedAt.get() < TimeUnit.NANOSECONDS.convert(ScheduledTriggers.DEFAULT_MIN_MS_BETWEEN_ACTIONS - DELTA_MS, TimeUnit.MILLISECONDS)) {
+            log.info("action executed again before minimum wait time from {}", event.getSource().getName());
+            fail("TriggerListener was fired before the throttling period");
+          }
+        }
+        if (onlyOnce.compareAndSet(false, true)) {
+          log.info("action executed from {}", event.getSource().getName());
+          lastActionExecutedAt.set(System.nanoTime());
+          triggerFiredLatch.countDown();
+        } else  {
+          log.info("action executed more than once from {}", event.getSource().getName());
+          fail("Trigger should not have fired more than once!");
+        }
+      } finally {
+        if (locked) {
+          lock.unlock();
+        }
+      }
+    }
+  }
+
+  @Test
   public void testNodeLostTriggerRestoreState() throws Exception {
     // for this test we want to update the trigger so we must assert that the actions were created twice
     TriggerIntegrationTest.actionCreated = new CountDownLatch(2);
@@ -124,7 +261,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
     cluster.stopJettySolrRunner(index);
 
     // ensure that the old trigger sees the stopped node, todo find a better way to do this
-    Thread.sleep(500 + TimeUnit.MILLISECONDS.convert(SCHEDULED_TRIGGER_DELAY_SECONDS, TimeUnit.SECONDS));
+    Thread.sleep(500 + TimeUnit.MILLISECONDS.convert(DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS, TimeUnit.SECONDS));
 
     waitForSeconds = 0;
     setTriggerCommand = "{" +
@@ -182,7 +319,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
     JettySolrRunner newNode = cluster.startJettySolrRunner();
 
     // ensure that the old trigger sees the new node, todo find a better way to do this
-    Thread.sleep(500 + TimeUnit.MILLISECONDS.convert(SCHEDULED_TRIGGER_DELAY_SECONDS, TimeUnit.SECONDS));
+    Thread.sleep(500 + TimeUnit.MILLISECONDS.convert(DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS, TimeUnit.SECONDS));
 
     waitForSeconds = 0;
     setTriggerCommand = "{" +