You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by da...@apache.org on 2018/01/23 10:31:02 UTC
[32/41] lucene-solr:jira/solr-11702: SOLR-11747: Pause triggers until
actions finish executing and the cool down period expires
SOLR-11747: Pause triggers until actions finish executing and the cool down period expires
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/54253534
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/54253534
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/54253534
Branch: refs/heads/jira/solr-11702
Commit: 5425353402641307d71af727ff18c63e4579c5c1
Parents: f491fad
Author: Shalin Shekhar Mangar <sh...@apache.org>
Authored: Thu Jan 18 18:19:24 2018 +0530
Committer: Shalin Shekhar Mangar <sh...@apache.org>
Committed: Thu Jan 18 18:19:24 2018 +0530
----------------------------------------------------------------------
solr/CHANGES.txt | 7 ++++
.../cloud/autoscaling/ScheduledTriggers.java | 36 ++++++++++++++++++--
.../autoscaling/TriggerIntegrationTest.java | 19 +++--------
.../cloud/autoscaling/sim/TestLargeCluster.java | 16 +--------
.../autoscaling/sim/TestTriggerIntegration.java | 9 ++---
5 files changed, 48 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/54253534/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index a6b2415..7f63679 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -62,6 +62,11 @@ Upgrade Notes
* SOLR-11809: QueryComponent's rq parameter parsing no longer considers the defType parameter.
+* SOLR-11747: The behaviour of the autoscaling system has been modified to pause all triggers from execution between
+ the start of actions and end of cool down period. The triggers will be resumed after the cool down period expires.
+ Previously, the cool down period was a fixed period started after actions for a trigger event finish
+ executing. During the cool down period, triggers wo
+
New Features
----------------------
* SOLR-11285: Simulation framework for autoscaling. (ab)
@@ -156,6 +161,8 @@ Other Changes
* SOLR-11810: Upgrade Jetty to 9.4.8.v20171121 (Varun Thacker, Erick Erickson)
+* SOLR-11747: Pause triggers until actions finish executing and the cool down period expires. (shalin)
+
================== 7.2.1 ==================
Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release.
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/54253534/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
index 25ec444..965299c 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
@@ -250,7 +250,8 @@ public class ScheduledTriggers implements Closeable {
// we do not want to lose this event just because the trigger was closed, perhaps a replacement will need it
return false;
}
- // reject events during cooldown period
+ // even though we pause all triggers during action execution there is a possibility that a trigger was already
+ // running at the time and would have already created an event so we reject such events during cooldown period
if (cooldownStart.get() + cooldownPeriod.get() > cloudManager.getTimeSource().getTime()) {
log.debug("-------- Cooldown period - rejecting event: " + event);
event.getProperties().put(TriggerEvent.COOLDOWN, true);
@@ -260,6 +261,9 @@ public class ScheduledTriggers implements Closeable {
log.debug("++++++++ Cooldown inactive - processing event: " + event);
}
if (hasPendingActions.compareAndSet(false, true)) {
+ // pause all triggers while we execute actions so triggers do not operate on a cluster in transition
+ pauseTriggers();
+
final boolean enqueued;
if (replaying) {
enqueued = false;
@@ -271,7 +275,7 @@ public class ScheduledTriggers implements Closeable {
List<TriggerAction> actions = source.getActions();
if (actions != null) {
if (actionExecutor.isShutdown()) {
- String msg = String.format(Locale.ROOT, "Ignoring autoscaling event %s because the executor has already been closed", event.toString(), source);
+ String msg = String.format(Locale.ROOT, "Ignoring autoscaling event %s from trigger %s because the executor has already been closed", event.toString(), source);
listeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.ABORTED, msg);
log.warn(msg);
// we do not want to lose this event just because the trigger was closed, perhaps a replacement will need it
@@ -311,6 +315,8 @@ public class ScheduledTriggers implements Closeable {
} finally {
cooldownStart.set(cloudManager.getTimeSource().getTime());
hasPendingActions.set(false);
+ // resume triggers after cool down period
+ resumeTriggers(cloudManager.getTimeSource().convertDelay(TimeUnit.NANOSECONDS, cooldownPeriod.get(), TimeUnit.MILLISECONDS));
}
log.debug("-- processing took {} ms for event id={}",
TimeUnit.NANOSECONDS.toMillis(cloudManager.getTimeSource().getTime() - eventProcessingStart), event.id);
@@ -325,6 +331,8 @@ public class ScheduledTriggers implements Closeable {
}
listeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.SUCCEEDED);
hasPendingActions.set(false);
+ // resume triggers now
+ resumeTriggers(0);
}
return true;
} else {
@@ -339,6 +347,30 @@ public class ScheduledTriggers implements Closeable {
TimeUnit.MILLISECONDS);
}
+ /**
+ * Pauses all scheduled trigger invocations without interrupting any that are in progress
+ */
+ private synchronized void pauseTriggers() {
+ if (log.isDebugEnabled()) {
+ log.debug("Pausing all triggers: {}", scheduledTriggers.keySet());
+ }
+ scheduledTriggers.forEach((s, scheduledTrigger) -> scheduledTrigger.scheduledFuture.cancel(false));
+ }
+
+ /**
+ * Resumes all previously cancelled triggers to be scheduled after the given initial delay
+ * @param afterDelayMillis the initial delay in milliseconds after which triggers should be resumed
+ */
+ private synchronized void resumeTriggers(long afterDelayMillis) {
+ scheduledTriggers.forEach((s, scheduledTrigger) -> {
+ if (scheduledTrigger.scheduledFuture.isCancelled()) {
+ log.debug("Resuming trigger: {} after {}ms", s, afterDelayMillis);
+ scheduledTrigger.scheduledFuture = scheduledThreadPoolExecutor.scheduleWithFixedDelay(scheduledTrigger, afterDelayMillis,
+ cloudManager.getTimeSource().convertDelay(TimeUnit.SECONDS, triggerDelay.get(), TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
+ }
+ });
+ }
+
private void waitForPendingTasks(AutoScaling.Trigger newTrigger, List<TriggerAction> actions) throws AlreadyClosedException {
DistribStateManager stateManager = cloudManager.getDistribStateManager();
try {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/54253534/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
index 639f240..3bce457 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
@@ -1194,13 +1194,8 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
// there must be at least one IGNORED event due to cooldown, and one SUCCEEDED event
capturedEvents = listenerEvents.get("bar");
- assertTrue(capturedEvents.toString(), capturedEvents.size() > 1);
- for (int i = 0; i < capturedEvents.size() - 1; i++) {
- CapturedEvent ev = capturedEvents.get(i);
- assertEquals(ev.toString(), TriggerEventProcessorStage.IGNORED, ev.stage);
- assertTrue(ev.toString(), ev.message.contains("cooldown"));
- }
- CapturedEvent ev = capturedEvents.get(capturedEvents.size() - 1);
+ assertEquals(capturedEvents.toString(),1, capturedEvents.size());
+ CapturedEvent ev = capturedEvents.get(0);
assertEquals(ev.toString(), TriggerEventProcessorStage.SUCCEEDED, ev.stage);
// the difference between timestamps of the first SUCCEEDED and the last SUCCEEDED
// must be larger than cooldown period
@@ -1235,19 +1230,13 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
// wait for listener to capture the SUCCEEDED stage
Thread.sleep(2000);
- // there must be at least one SUCCEEDED (due to newNode3) then for newNode4 one IGNORED
- // event due to cooldown, and one SUCCEEDED
+ // there must be two SUCCEEDED (due to newNode3 and newNode4) and maybe some ignored events
capturedEvents = listenerEvents.get("bar");
- assertTrue(capturedEvents.toString(), capturedEvents.size() > 2);
+ assertTrue(capturedEvents.toString(), capturedEvents.size() >= 2);
// first event should be SUCCEEDED
ev = capturedEvents.get(0);
assertEquals(ev.toString(), TriggerEventProcessorStage.SUCCEEDED, ev.stage);
- for (int i = 1; i < capturedEvents.size() - 1; i++) {
- ev = capturedEvents.get(i);
- assertEquals(ev.toString(), TriggerEventProcessorStage.IGNORED, ev.stage);
- assertTrue(ev.toString(), ev.message.contains("cooldown"));
- }
ev = capturedEvents.get(capturedEvents.size() - 1);
assertEquals(ev.toString(), TriggerEventProcessorStage.SUCCEEDED, ev.stage);
// the difference between timestamps of the first SUCCEEDED and the last SUCCEEDED
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/54253534/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestLargeCluster.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestLargeCluster.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestLargeCluster.java
index 3adf652..e9c686b 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestLargeCluster.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestLargeCluster.java
@@ -254,21 +254,7 @@ public class TestLargeCluster extends SimSolrCloudTestCase {
}
assertTrue("no STARTED event", startedEventPos > -1);
SolrInputDocument startedEvent = systemColl.get(startedEventPos);
- int ignored = 0;
int lastIgnoredPos = startedEventPos;
- for (int i = startedEventPos + 1; i < systemColl.size(); i++) {
- SolrInputDocument d = systemColl.get(i);
- if (!"node_added_trigger".equals(d.getFieldValue("event.source_s"))) {
- continue;
- }
- if ("NODEADDED".equals(d.getFieldValue("event.type_s"))) {
- if ("IGNORED".equals(d.getFieldValue("stage_s"))) {
- ignored++;
- lastIgnoredPos = i;
- }
- }
- }
- assertTrue("no IGNORED events", ignored > 0);
// make sure some replicas have been moved
assertTrue("no MOVEREPLICA ops?", cluster.simGetOpCount("MOVEREPLICA") > 0);
@@ -306,7 +292,7 @@ public class TestLargeCluster extends SimSolrCloudTestCase {
@Test
public void testNodeLost() throws Exception {
- doTestNodeLost(waitForSeconds, 5000, 1);
+ doTestNodeLost(waitForSeconds, 5000, 0);
}
// Renard R5 series - evenly covers a log10 range
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/54253534/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestTriggerIntegration.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestTriggerIntegration.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestTriggerIntegration.java
index 3a118f2..807d269 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestTriggerIntegration.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/TestTriggerIntegration.java
@@ -1095,14 +1095,9 @@ public class TestTriggerIntegration extends SimSolrCloudTestCase {
// wait for listener to capture the SUCCEEDED stage
cluster.getTimeSource().sleep(2000);
- // there must be at least one IGNORED event due to cooldown, and one SUCCEEDED event
+ // there must be exactly one SUCCEEDED event
capturedEvents = listenerEvents.get("bar");
- assertTrue(capturedEvents.toString(), capturedEvents.size() > 1);
- for (int i = 0; i < capturedEvents.size() - 1; i++) {
- CapturedEvent ev = capturedEvents.get(i);
- assertEquals(ev.toString(), TriggerEventProcessorStage.IGNORED, ev.stage);
- assertTrue(ev.toString(), ev.message.contains("cooldown"));
- }
+ assertTrue(capturedEvents.toString(), capturedEvents.size() >= 1);
CapturedEvent ev = capturedEvents.get(capturedEvents.size() - 1);
assertEquals(ev.toString(), TriggerEventProcessorStage.SUCCEEDED, ev.stage);
// the difference between timestamps of the first SUCCEEDED and the last SUCCEEDED