You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by da...@apache.org on 2018/01/23 10:31:02 UTC

[32/41] lucene-solr:jira/solr-11702: SOLR-11747: Pause triggers until actions finish executing and the cool down period expires

SOLR-11747: Pause triggers until actions finish executing and the cool down period expires


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/54253534
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/54253534
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/54253534

Branch: refs/heads/jira/solr-11702
Commit: 5425353402641307d71af727ff18c63e4579c5c1
Parents: f491fad
Author: Shalin Shekhar Mangar <sh...@apache.org>
Authored: Thu Jan 18 18:19:24 2018 +0530
Committer: Shalin Shekhar Mangar <sh...@apache.org>
Committed: Thu Jan 18 18:19:24 2018 +0530

----------------------------------------------------------------------
 solr/CHANGES.txt                                |  7 ++++
 .../cloud/autoscaling/ScheduledTriggers.java    | 36 ++++++++++++++++++--
 .../autoscaling/TriggerIntegrationTest.java     | 19 +++--------
 .../cloud/autoscaling/sim/TestLargeCluster.java | 16 +--------
 .../autoscaling/sim/TestTriggerIntegration.java |  9 ++---
 5 files changed, 48 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/54253534/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index a6b2415..7f63679 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -62,6 +62,11 @@ Upgrade Notes
 
 * SOLR-11809: QueryComponent's rq parameter parsing no longer considers the defType parameter.
 
+* SOLR-11747: The behaviour of the autoscaling system has been modified to pause all triggers from execution between
+  the start of actions and end of cool down period. The triggers will be resumed after the cool down period expires.
+  Previously, the cool down period was a fixed period started after actions for a trigger event finish
+  executing. During the cool down period, triggers wo
+
 New Features
 ----------------------
 * SOLR-11285: Simulation framework for autoscaling. (ab)
@@ -156,6 +161,8 @@ Other Changes
 
 * SOLR-11810: Upgrade Jetty to 9.4.8.v20171121 (Varun Thacker, Erick Erickson)
 
+* SOLR-11747: Pause triggers until actions finish executing and the cool down period expires. (shalin)
+
 ==================  7.2.1 ==================
 
 Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release.

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/54253534/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
index 25ec444..965299c 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
@@ -250,7 +250,8 @@ public class ScheduledTriggers implements Closeable {
         // we do not want to lose this event just because the trigger was closed, perhaps a replacement will need it
         return false;
       }
-      // reject events during cooldown period
+      // even though we pause all triggers during action execution there is a possibility that a trigger was already
+      // running at the time and would have already created an event so we reject such events during cooldown period
       if (cooldownStart.get() + cooldownPeriod.get() > cloudManager.getTimeSource().getTime()) {
         log.debug("-------- Cooldown period - rejecting event: " + event);
         event.getProperties().put(TriggerEvent.COOLDOWN, true);
@@ -260,6 +261,9 @@ public class ScheduledTriggers implements Closeable {
         log.debug("++++++++ Cooldown inactive - processing event: " + event);
       }
       if (hasPendingActions.compareAndSet(false, true)) {
+        // pause all triggers while we execute actions so triggers do not operate on a cluster in transition
+        pauseTriggers();
+
         final boolean enqueued;
         if (replaying) {
           enqueued = false;
@@ -271,7 +275,7 @@ public class ScheduledTriggers implements Closeable {
         List<TriggerAction> actions = source.getActions();
         if (actions != null) {
           if (actionExecutor.isShutdown()) {
-            String msg = String.format(Locale.ROOT, "Ignoring autoscaling event %s because the executor has already been closed", event.toString(), source);
+            String msg = String.format(Locale.ROOT, "Ignoring autoscaling event %s from trigger %s because the executor has already been closed", event.toString(), source);
             listeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.ABORTED, msg);
             log.warn(msg);
             // we do not want to lose this event just because the trigger was closed, perhaps a replacement will need it
@@ -311,6 +315,8 @@ public class ScheduledTriggers implements Closeable {
             } finally {
               cooldownStart.set(cloudManager.getTimeSource().getTime());
               hasPendingActions.set(false);
+              // resume triggers after cool down period
+              resumeTriggers(cloudManager.getTimeSource().convertDelay(TimeUnit.NANOSECONDS, cooldownPeriod.get(), TimeUnit.MILLISECONDS));
             }
             log.debug("-- processing took {} ms for event id={}",
                 TimeUnit.NANOSECONDS.toMillis(cloudManager.getTimeSource().getTime() - eventProcessingStart), event.id);
@@ -325,6 +331,8 @@ public class ScheduledTriggers implements Closeable {
           }
           listeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.SUCCEEDED);
           hasPendingActions.set(false);
+          // resume triggers now
+          resumeTriggers(0);
         }
         return true;
       } else {
@@ -339,6 +347,30 @@ public class ScheduledTriggers implements Closeable {
         TimeUnit.MILLISECONDS);
   }
 
+  /**
+   * Pauses all scheduled trigger invocations without interrupting any that are in progress
+   */
+  private synchronized void pauseTriggers()  {
+    if (log.isDebugEnabled()) {
+      log.debug("Pausing all triggers: {}", scheduledTriggers.keySet());
+    }
+    scheduledTriggers.forEach((s, scheduledTrigger) -> scheduledTrigger.scheduledFuture.cancel(false));
+  }
+
+  /**
+   * Resumes all previously cancelled triggers to be scheduled after the given initial delay
+   * @param afterDelayMillis the initial delay in milliseconds after which triggers should be resumed
+   */
+  private synchronized void resumeTriggers(long afterDelayMillis) {
+    scheduledTriggers.forEach((s, scheduledTrigger) ->  {
+      if (scheduledTrigger.scheduledFuture.isCancelled()) {
+        log.debug("Resuming trigger: {} after {}ms", s, afterDelayMillis);
+        scheduledTrigger.scheduledFuture = scheduledThreadPoolExecutor.scheduleWithFixedDelay(scheduledTrigger, afterDelayMillis,
+            cloudManager.getTimeSource().convertDelay(TimeUnit.SECONDS, triggerDelay.get(), TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
+      }
+    });
+  }
+
   private void waitForPendingTasks(AutoScaling.Trigger newTrigger, List<TriggerAction> actions) throws AlreadyClosedException {
     DistribStateManager stateManager = cloudManager.getDistribStateManager();
     try {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/54253534/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
index 639f240..3bce457 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
@@ -1194,13 +1194,8 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
 
     // there must be at least one IGNORED event due to cooldown, and one SUCCEEDED event
     capturedEvents = listenerEvents.get("bar");
-    assertTrue(capturedEvents.toString(), capturedEvents.size() > 1);
-    for (int i = 0; i < capturedEvents.size() - 1; i++) {
-      CapturedEvent ev = capturedEvents.get(i);
-      assertEquals(ev.toString(), TriggerEventProcessorStage.IGNORED, ev.stage);
-      assertTrue(ev.toString(), ev.message.contains("cooldown"));
-    }
-    CapturedEvent ev = capturedEvents.get(capturedEvents.size() - 1);
+    assertEquals(capturedEvents.toString(),1,  capturedEvents.size());
+    CapturedEvent ev = capturedEvents.get(0);
     assertEquals(ev.toString(), TriggerEventProcessorStage.SUCCEEDED, ev.stage);
     // the difference between timestamps of the first SUCCEEDED and the last SUCCEEDED
     // must be larger than cooldown period
@@ -1235,19 +1230,13 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
     // wait for listener to capture the SUCCEEDED stage
     Thread.sleep(2000);
 
-    // there must be at least one SUCCEEDED (due to newNode3) then for newNode4 one IGNORED
-    // event due to cooldown, and one SUCCEEDED
+    // there must be two SUCCEEDED (due to newNode3 and newNode4) and maybe some ignored events
     capturedEvents = listenerEvents.get("bar");
-    assertTrue(capturedEvents.toString(), capturedEvents.size() > 2);
+    assertTrue(capturedEvents.toString(), capturedEvents.size() >= 2);
     // first event should be SUCCEEDED
     ev = capturedEvents.get(0);
     assertEquals(ev.toString(), TriggerEventProcessorStage.SUCCEEDED, ev.stage);
 
-    for (int i = 1; i < capturedEvents.size() - 1; i++) {
-      ev = capturedEvents.get(i);
-      assertEquals(ev.toString(), TriggerEventProcessorStage.IGNORED, ev.stage);
-      assertTrue(ev.toString(), ev.message.contains("cooldown"));
-    }
     ev = capturedEvents.get(capturedEvents.size() - 1);
     assertEquals(ev.toString(), TriggerEventProcessorStage.SUCCEEDED, ev.stage);
     // the difference between timestamps of the first SUCCEEDED and the last SUCCEEDED

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/54253534/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestLargeCluster.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestLargeCluster.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestLargeCluster.java
index 3adf652..e9c686b 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestLargeCluster.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestLargeCluster.java
@@ -254,21 +254,7 @@ public class TestLargeCluster extends SimSolrCloudTestCase {
     }
     assertTrue("no STARTED event", startedEventPos > -1);
     SolrInputDocument startedEvent = systemColl.get(startedEventPos);
-    int ignored = 0;
     int lastIgnoredPos = startedEventPos;
-    for (int i = startedEventPos + 1; i < systemColl.size(); i++) {
-      SolrInputDocument d = systemColl.get(i);
-      if (!"node_added_trigger".equals(d.getFieldValue("event.source_s"))) {
-        continue;
-      }
-      if ("NODEADDED".equals(d.getFieldValue("event.type_s"))) {
-        if ("IGNORED".equals(d.getFieldValue("stage_s"))) {
-          ignored++;
-          lastIgnoredPos = i;
-        }
-      }
-    }
-    assertTrue("no IGNORED events", ignored > 0);
     // make sure some replicas have been moved
     assertTrue("no MOVEREPLICA ops?", cluster.simGetOpCount("MOVEREPLICA") > 0);
 
@@ -306,7 +292,7 @@ public class TestLargeCluster extends SimSolrCloudTestCase {
 
   @Test
   public void testNodeLost() throws Exception {
-    doTestNodeLost(waitForSeconds, 5000, 1);
+    doTestNodeLost(waitForSeconds, 5000, 0);
   }
 
   // Renard R5 series - evenly covers a log10 range

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/54253534/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestTriggerIntegration.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestTriggerIntegration.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestTriggerIntegration.java
index 3a118f2..807d269 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestTriggerIntegration.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestTriggerIntegration.java
@@ -1095,14 +1095,9 @@ public class TestTriggerIntegration extends SimSolrCloudTestCase {
     // wait for listener to capture the SUCCEEDED stage
     cluster.getTimeSource().sleep(2000);
 
-    // there must be at least one IGNORED event due to cooldown, and one SUCCEEDED event
+    // there must be exactly one SUCCEEDED event
     capturedEvents = listenerEvents.get("bar");
-    assertTrue(capturedEvents.toString(), capturedEvents.size() > 1);
-    for (int i = 0; i < capturedEvents.size() - 1; i++) {
-      CapturedEvent ev = capturedEvents.get(i);
-      assertEquals(ev.toString(), TriggerEventProcessorStage.IGNORED, ev.stage);
-      assertTrue(ev.toString(), ev.message.contains("cooldown"));
-    }
+    assertTrue(capturedEvents.toString(), capturedEvents.size() >= 1);
     CapturedEvent ev = capturedEvents.get(capturedEvents.size() - 1);
     assertEquals(ev.toString(), TriggerEventProcessorStage.SUCCEEDED, ev.stage);
     // the difference between timestamps of the first SUCCEEDED and the last SUCCEEDED