You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2017/07/10 17:04:24 UTC
[1/2] lucene-solr:feature/autoscaling: SOLR-10996: Implement
TriggerListener API.
Repository: lucene-solr
Updated Branches:
refs/heads/feature/autoscaling b8c86d24d -> 9c8e829f5
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9c8e829f/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
index 0ceb875..6e162f5 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
@@ -17,8 +17,9 @@
package org.apache.solr.cloud.autoscaling;
-import java.io.IOException;
import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -42,6 +43,7 @@ import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.Utils;
+import org.apache.solr.core.CoreContainer;
import org.apache.solr.util.LogLevel;
import org.apache.solr.util.TimeOut;
import org.apache.solr.util.TimeSource;
@@ -566,18 +568,13 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
newNode.getNodeName(), nodeAddedEvent.getProperty(TriggerEvent.NODE_NAME));
}
- public static class TestTriggerAction implements TriggerAction {
+ public static class TestTriggerAction extends TriggerActionBase {
public TestTriggerAction() {
actionConstructorCalled.countDown();
}
@Override
- public String getName() {
- return "TestTriggerAction";
- }
-
- @Override
public void process(TriggerEvent event, ActionContext actionContext) {
try {
if (triggerFired.compareAndSet(false, true)) {
@@ -596,29 +593,20 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
}
@Override
- public void close() throws IOException {
-
- }
-
- @Override
public void init(Map<String, String> args) {
log.info("TestTriggerAction init");
actionInitCalled.countDown();
+ super.init(args);
}
}
- public static class TestEventQueueAction implements TriggerAction {
+ public static class TestEventQueueAction extends TriggerActionBase {
public TestEventQueueAction() {
log.info("TestEventQueueAction instantiated");
}
@Override
- public String getName() {
- return this.getClass().getSimpleName();
- }
-
- @Override
public void process(TriggerEvent event, ActionContext actionContext) {
log.info("-- event: " + event);
events.add(event);
@@ -634,14 +622,10 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
}
@Override
- public void close() throws IOException {
-
- }
-
- @Override
public void init(Map<String, String> args) {
log.debug("TestTriggerAction init");
actionInitCalled.countDown();
+ super.init(args);
}
}
@@ -790,18 +774,13 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
return listener;
}
- public static class TestEventMarkerAction implements TriggerAction {
+ public static class TestEventMarkerAction extends TriggerActionBase {
public TestEventMarkerAction() {
actionConstructorCalled.countDown();
}
@Override
- public String getName() {
- return "TestEventMarkerAction";
- }
-
- @Override
public void process(TriggerEvent event, ActionContext actionContext) {
boolean locked = lock.tryLock();
if (!locked) {
@@ -820,14 +799,10 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
}
@Override
- public void close() throws IOException {
-
- }
-
- @Override
public void init(Map<String, String> args) {
log.info("TestEventMarkerAction init");
actionInitCalled.countDown();
+ super.init(args);
}
}
@@ -952,4 +927,209 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
assertEquals(overseerLeader, ev.getProperty(TriggerEvent.NODE_NAME));
assertEquals(AutoScaling.EventType.NODELOST, ev.getEventType());
}
+
+ private static class TestEvent {
+ final AutoScalingConfig.TriggerListenerConfig config;
+ final AutoScaling.EventProcessorStage stage;
+ final String actionName;
+ final TriggerEvent event;
+ final String message;
+
+ TestEvent(AutoScalingConfig.TriggerListenerConfig config, AutoScaling.EventProcessorStage stage, String actionName, TriggerEvent event, String message) {
+ this.config = config;
+ this.stage = stage;
+ this.actionName = actionName;
+ this.event = event;
+ this.message = message;
+ }
+
+ @Override
+ public String toString() {
+ return "TestEvent{" +
+ "config=" + config +
+ ", stage=" + stage +
+ ", actionName='" + actionName + '\'' +
+ ", event=" + event +
+ ", message='" + message + '\'' +
+ '}';
+ }
+ }
+
+ static Map<String, List<TestEvent>> listenerEvents = new HashMap<>();
+ static CountDownLatch listenerCreated = new CountDownLatch(1);
+ static boolean failDummyAction = false;
+
+ public static class TestTriggerListener extends TriggerListenerBase {
+ @Override
+ public void init(CoreContainer coreContainer, AutoScalingConfig.TriggerListenerConfig config) {
+ super.init(coreContainer, config);
+ listenerCreated.countDown();
+ }
+
+ @Override
+ public synchronized void onEvent(TriggerEvent event, AutoScaling.EventProcessorStage stage, String actionName,
+ ActionContext context, Throwable error, String message) {
+ List<TestEvent> lst = listenerEvents.get(config.name);
+ if (lst == null) {
+ lst = new ArrayList<>();
+ listenerEvents.put(config.name, lst);
+ }
+ lst.add(new TestEvent(config, stage, actionName, event, message));
+ }
+ }
+
+ public static class TestDummyAction extends TriggerActionBase {
+
+ @Override
+ public void process(TriggerEvent event, ActionContext context) {
+ if (failDummyAction) {
+ throw new RuntimeException("failure");
+ }
+
+ }
+ }
+
+ @Test
+ public void testListeners() throws Exception {
+ CloudSolrClient solrClient = cluster.getSolrClient();
+ String setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'node_added_trigger'," +
+ "'event' : 'nodeAdded'," +
+ "'waitFor' : '" + waitForSeconds + "s'," +
+ "'enabled' : true," +
+ "'actions' : [" +
+ "{'name':'test','class':'" + TestTriggerAction.class.getName() + "'}," +
+ "{'name':'test1','class':'" + TestDummyAction.class.getName() + "'}," +
+ "]" +
+ "}}";
+ SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+ NamedList<Object> response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ if (!actionInitCalled.await(3, TimeUnit.SECONDS)) {
+ fail("The TriggerAction should have been created by now");
+ }
+
+ String setListenerCommand = "{" +
+ "'set-listener' : " +
+ "{" +
+ "'name' : 'foo'," +
+ "'trigger' : 'node_added_trigger'," +
+ "'stage' : ['WAITING', 'STARTED','ABORTED','SUCCEEDED', 'FAILED']," +
+ "'beforeAction' : 'test'," +
+ "'afterAction' : ['test', 'test1']," +
+ "'class' : '" + TestTriggerListener.class.getName() + "'" +
+ "}" +
+ "}";
+ req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand);
+ response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ String setListenerCommand1 = "{" +
+ "'set-listener' : " +
+ "{" +
+ "'name' : 'bar'," +
+ "'trigger' : 'node_added_trigger'," +
+ "'stage' : ['WAITING','FAILED','SUCCEEDED']," +
+ "'beforeAction' : ['test', 'test1']," +
+ "'afterAction' : 'test'," +
+ "'class' : '" + TestTriggerListener.class.getName() + "'" +
+ "}" +
+ "}";
+ req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand1);
+ response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ listenerEvents.clear();
+ failDummyAction = false;
+
+ JettySolrRunner newNode = cluster.startJettySolrRunner();
+ boolean await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
+ assertTrue("The trigger did not fire at all", await);
+ assertTrue(triggerFired.get());
+
+ assertEquals("both listeners should have fired", 2, listenerEvents.size());
+
+ Thread.sleep(2000);
+
+ // check foo events
+ List<TestEvent> testEvents = listenerEvents.get("foo");
+ assertNotNull("foo events: " + testEvents, testEvents);
+ assertEquals("foo events: " + testEvents, 5, testEvents.size());
+
+ assertEquals(AutoScaling.EventProcessorStage.STARTED, testEvents.get(0).stage);
+
+ assertEquals(AutoScaling.EventProcessorStage.BEFORE_ACTION, testEvents.get(1).stage);
+ assertEquals("test", testEvents.get(1).actionName);
+
+ assertEquals(AutoScaling.EventProcessorStage.AFTER_ACTION, testEvents.get(2).stage);
+ assertEquals("test", testEvents.get(2).actionName);
+
+ assertEquals(AutoScaling.EventProcessorStage.AFTER_ACTION, testEvents.get(3).stage);
+ assertEquals("test1", testEvents.get(3).actionName);
+
+ assertEquals(AutoScaling.EventProcessorStage.SUCCEEDED, testEvents.get(4).stage);
+
+ // check bar events
+ testEvents = listenerEvents.get("bar");
+ assertNotNull("bar events", testEvents);
+ assertEquals("bar events", 4, testEvents.size());
+
+ assertEquals(AutoScaling.EventProcessorStage.BEFORE_ACTION, testEvents.get(0).stage);
+ assertEquals("test", testEvents.get(0).actionName);
+
+ assertEquals(AutoScaling.EventProcessorStage.AFTER_ACTION, testEvents.get(1).stage);
+ assertEquals("test", testEvents.get(1).actionName);
+
+ assertEquals(AutoScaling.EventProcessorStage.BEFORE_ACTION, testEvents.get(2).stage);
+ assertEquals("test1", testEvents.get(2).actionName);
+
+ assertEquals(AutoScaling.EventProcessorStage.SUCCEEDED, testEvents.get(3).stage);
+
+ // reset
+ triggerFired.set(false);
+ triggerFiredLatch = new CountDownLatch(1);
+ listenerEvents.clear();
+ failDummyAction = true;
+
+ newNode = cluster.startJettySolrRunner();
+ await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
+ assertTrue("The trigger did not fire at all", await);
+
+ Thread.sleep(2000);
+
+ // check foo events
+ testEvents = listenerEvents.get("foo");
+ assertNotNull("foo events: " + testEvents, testEvents);
+ assertEquals("foo events: " + testEvents, 4, testEvents.size());
+
+ assertEquals(AutoScaling.EventProcessorStage.STARTED, testEvents.get(0).stage);
+
+ assertEquals(AutoScaling.EventProcessorStage.BEFORE_ACTION, testEvents.get(1).stage);
+ assertEquals("test", testEvents.get(1).actionName);
+
+ assertEquals(AutoScaling.EventProcessorStage.AFTER_ACTION, testEvents.get(2).stage);
+ assertEquals("test", testEvents.get(2).actionName);
+
+ assertEquals(AutoScaling.EventProcessorStage.FAILED, testEvents.get(3).stage);
+ assertEquals("test1", testEvents.get(3).actionName);
+
+ // check bar events
+ testEvents = listenerEvents.get("bar");
+ assertNotNull("bar events", testEvents);
+ assertEquals("bar events", 4, testEvents.size());
+
+ assertEquals(AutoScaling.EventProcessorStage.BEFORE_ACTION, testEvents.get(0).stage);
+ assertEquals("test", testEvents.get(0).actionName);
+
+ assertEquals(AutoScaling.EventProcessorStage.AFTER_ACTION, testEvents.get(1).stage);
+ assertEquals("test", testEvents.get(1).actionName);
+
+ assertEquals(AutoScaling.EventProcessorStage.BEFORE_ACTION, testEvents.get(2).stage);
+ assertEquals("test1", testEvents.get(2).actionName);
+
+ assertEquals(AutoScaling.EventProcessorStage.FAILED, testEvents.get(3).stage);
+ assertEquals("test1", testEvents.get(3).actionName);
+ }
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9c8e829f/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpClientUtil.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpClientUtil.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpClientUtil.java
index 7a84e7f..d064a06 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpClientUtil.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpClientUtil.java
@@ -73,8 +73,8 @@ public class HttpClientUtil {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- private static final int DEFAULT_CONNECT_TIMEOUT = 60000;
- private static final int DEFAULT_SO_TIMEOUT = 600000;
+ public static final int DEFAULT_CONNECT_TIMEOUT = 60000;
+ public static final int DEFAULT_SO_TIMEOUT = 600000;
private static final int VALIDATE_AFTER_INACTIVITY_DEFAULT = 3000;
private static final int EVICT_IDLE_CONNECTIONS_DEFAULT = 50000;
[2/2] lucene-solr:feature/autoscaling: SOLR-10996: Implement
TriggerListener API.
Posted by ab...@apache.org.
SOLR-10996: Implement TriggerListener API.
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/9c8e829f
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/9c8e829f
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/9c8e829f
Branch: refs/heads/feature/autoscaling
Commit: 9c8e829f58c78cdb88f03f747b5587b59a165423
Parents: b8c86d2
Author: Andrzej Bialecki <ab...@apache.org>
Authored: Mon Jul 10 18:29:15 2017 +0200
Committer: Andrzej Bialecki <ab...@apache.org>
Committed: Mon Jul 10 18:29:15 2017 +0200
----------------------------------------------------------------------
solr/CHANGES.txt | 1 +
.../solr/cloud/autoscaling/AutoScaling.java | 35 ++-
.../cloud/autoscaling/AutoScalingConfig.java | 148 ++++++++---
.../cloud/autoscaling/AutoScalingHandler.java | 4 +-
.../cloud/autoscaling/ComputePlanAction.java | 19 +-
.../cloud/autoscaling/ExecutePlanAction.java | 20 +-
.../cloud/autoscaling/HttpTriggerListener.java | 176 +++++++++++++
.../solr/cloud/autoscaling/LogPlanAction.java | 20 +-
.../cloud/autoscaling/LogTriggerListener.java | 39 +++
.../cloud/autoscaling/NodeAddedTrigger.java | 20 +-
.../solr/cloud/autoscaling/NodeLostTrigger.java | 20 +-
.../autoscaling/OverseerTriggerThread.java | 24 +-
.../cloud/autoscaling/ScheduledTriggers.java | 208 +++++++++++++++-
.../cloud/autoscaling/TriggerActionBase.java | 47 ++++
.../solr/cloud/autoscaling/TriggerListener.java | 45 ++++
.../cloud/autoscaling/TriggerListenerBase.java | 46 ++++
.../autoscaling/AutoScalingHandlerTest.java | 6 +-
.../autoscaling/HttpTriggerListenerTest.java | 209 ++++++++++++++++
.../cloud/autoscaling/NodeAddedTriggerTest.java | 18 +-
.../cloud/autoscaling/NodeLostTriggerTest.java | 18 +-
.../autoscaling/TriggerIntegrationTest.java | 248 ++++++++++++++++---
.../solr/client/solrj/impl/HttpClientUtil.java | 4 +-
22 files changed, 1171 insertions(+), 204 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9c8e829f/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 81ab980..e3decb1 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -58,6 +58,7 @@ New Features
----------------------
* SOLR-11019: Add addAll Stream Evaluator (Joel Bernstein)
+* SOLR-10996: Implement TriggerListener API (ab, shalin)
Bug Fixes
----------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9c8e829f/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java
index cd65090..fa7311c 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java
@@ -38,7 +38,7 @@ public class AutoScaling {
INDEXRATE
}
- public enum TriggerStage {
+ public enum EventProcessorStage {
WAITING,
STARTED,
ABORTED,
@@ -48,29 +48,26 @@ public class AutoScaling {
AFTER_ACTION
}
- public interface TriggerListener {
+ /**
+ * Implementation of this interface is used for processing events generated by a trigger.
+ */
+ public interface EventProcessor {
+
/**
- * This method is executed when a trigger is ready to fire.
+ * This method is executed for events produced by {@link Trigger#run()}.
*
* @param event a subclass of {@link TriggerEvent}
- * @return true if the listener was ready to perform actions on the event, false
+ * @return true if the processor was ready to perform actions on the event, false
* otherwise. If false was returned then callers should assume the event was discarded.
*/
- boolean triggerFired(TriggerEvent event);
- }
-
- public static class HttpCallbackListener implements TriggerListener {
- @Override
- public boolean triggerFired(TriggerEvent event) {
- return true;
- }
+ boolean process(TriggerEvent event);
}
/**
* Interface for a Solr trigger. Each trigger implements Runnable and Closeable interface. A trigger
* is scheduled using a {@link java.util.concurrent.ScheduledExecutorService} so it is executed as
- * per a configured schedule to check whether the trigger is ready to fire. The {@link Trigger#setListener(TriggerListener)}
- * method should be used to set a callback listener which is fired by implementation of this class whenever
+ * per a configured schedule to check whether the trigger is ready to fire. The {@link Trigger#setProcessor(EventProcessor)}
+ * method should be used to set a processor which is used by implementation of this class whenever
* ready.
* <p>
* As per the guarantees made by the {@link java.util.concurrent.ScheduledExecutorService} a trigger
@@ -79,7 +76,7 @@ public class AutoScaling {
* which can be get/set by a different thread than the one executing the trigger. Therefore, implementations
* should use appropriate synchronization around the listener.
* <p>
- * When a trigger is ready to fire, it calls the {@link TriggerListener#triggerFired(TriggerEvent)} event
+ * When a trigger is ready to fire, it calls the {@link EventProcessor#process(TriggerEvent)} event
* with the proper trigger event object. If that method returns false then it should be interpreted to mean
* that Solr is not ready to process this trigger event and therefore we should retain the state and fire
* at the next invocation of the run() method.
@@ -107,11 +104,11 @@ public class AutoScaling {
/** Actions to execute when event is fired. */
List<TriggerAction> getActions();
- /** Set event listener to call when event is fired. */
- void setListener(TriggerListener listener);
+ /** Set event processor to call when event is fired. */
+ void setProcessor(EventProcessor processor);
- /** Get event listener. */
- TriggerListener getListener();
+ /** Get event processor. */
+ EventProcessor getProcessor();
/** Return true when this trigger is closed and cannot be used. */
boolean isClosed();
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9c8e829f/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingConfig.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingConfig.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingConfig.java
index 5714ac9..54e9170 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingConfig.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingConfig.java
@@ -16,44 +16,79 @@
*/
package org.apache.solr.cloud.autoscaling;
+import java.lang.invoke.MethodHandles;
import java.util.Collections;
+import java.util.EnumSet;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
+import java.util.Set;
import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
import org.apache.solr.common.params.AutoScalingParams;
import org.apache.solr.common.util.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Simple bean representation of <code>autoscaling.json</code>, which parses data
* lazily.
*/
public class AutoScalingConfig {
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final Map<String, Object> jsonMap;
private Policy policy;
private Map<String, TriggerConfig> triggers;
- private Map<String, ListenerConfig> listeners;
+ private Map<String, TriggerListenerConfig> listeners;
/**
- * Bean representation of {@link org.apache.solr.cloud.autoscaling.AutoScaling.TriggerListener} config.
+ * Bean representation of {@link TriggerListener} config.
*/
- public static class ListenerConfig {
- public String trigger;
- public List<String> stages;
- public String listenerClass;
- public List<Map<String, String>> beforeActions;
- public List<Map<String, String>> afterActions;
-
- public ListenerConfig(Map<String, Object> properties) {
+ public static class TriggerListenerConfig {
+ public final String name;
+ public final String trigger;
+ public final EnumSet<AutoScaling.EventProcessorStage> stages = EnumSet.noneOf(AutoScaling.EventProcessorStage.class);
+ public final String listenerClass;
+ public final Set<String> beforeActions;
+ public final Set<String> afterActions;
+ public final Map<String, Object> properties = new HashMap<>();
+
+ public TriggerListenerConfig(String name, Map<String, Object> properties) {
+ this.name = name;
+ this.properties.putAll(properties);
trigger = (String)properties.get(AutoScalingParams.TRIGGER);
- stages = (List<String>)properties.getOrDefault(AutoScalingParams.STAGE, Collections.emptyList());
+ List<String> stageNames = getList(AutoScalingParams.STAGE, properties);
+ for (String stageName : stageNames) {
+ try {
+ AutoScaling.EventProcessorStage stage = AutoScaling.EventProcessorStage.valueOf(stageName.toUpperCase(Locale.ROOT));
+ stages.add(stage);
+ } catch (Exception e) {
+ LOG.warn("Invalid stage name '" + name + "' in listener config, skipping: " + properties);
+ }
+ }
listenerClass = (String)properties.get(AutoScalingParams.CLASS);
- beforeActions = (List<Map<String, String>>)properties.getOrDefault(AutoScalingParams.BEFORE_ACTION, Collections.emptyList());
- afterActions = (List<Map<String, String>>)properties.getOrDefault(AutoScalingParams.AFTER_ACTION, Collections.emptyList());
+ beforeActions = new HashSet<>(getList(AutoScalingParams.BEFORE_ACTION, properties));
+ afterActions = new HashSet<>(getList(AutoScalingParams.AFTER_ACTION, properties));
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ TriggerListenerConfig that = (TriggerListenerConfig) o;
+
+ if (name != null ? !name.equals(that.name) : that.name != null) return false;
+ if (trigger != null ? !trigger.equals(that.trigger) : that.trigger != null) return false;
+ if (!stages.equals(that.stages)) return false;
+ if (listenerClass != null ? !listenerClass.equals(that.listenerClass) : that.listenerClass != null) return false;
+ if (!beforeActions.equals(that.beforeActions)) return false;
+ if (!afterActions.equals(that.afterActions)) return false;
+ return properties.equals(that.properties);
}
}
@@ -61,25 +96,50 @@ public class AutoScalingConfig {
* Bean representation of {@link org.apache.solr.cloud.autoscaling.AutoScaling.Trigger} config.
*/
public static class TriggerConfig {
+ public final String name;
public final AutoScaling.EventType eventType;
public final Map<String, Object> properties = new HashMap<>();
- public TriggerConfig(Map<String, Object> properties) {
+ public TriggerConfig(String name, Map<String, Object> properties) {
+ this.name = name;
String event = (String) properties.get(AutoScalingParams.EVENT);
this.eventType = AutoScaling.EventType.valueOf(event.toUpperCase(Locale.ROOT));
this.properties.putAll(properties);
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ TriggerConfig that = (TriggerConfig) o;
+
+ if (name != null ? !name.equals(that.name) : that.name != null) return false;
+ if (eventType != that.eventType) return false;
+ return properties.equals(that.properties);
+ }
+ }
+
+ public AutoScalingConfig(byte[] utf8) {
+ this(utf8 != null && utf8.length > 0 ? (Map<String, Object>)Utils.fromJSON(utf8) : Collections.emptyMap());
}
/**
* Construct from a JSON map representation.
- * @param jsonMap
+ * @param jsonMap JSON map representation of the config.
*/
public AutoScalingConfig(Map<String, Object> jsonMap) {
this.jsonMap = Utils.getDeepCopy(jsonMap, 10);
}
/**
+ * Return the original JSON map representation that was used for building this config.
+ */
+ public Map<String, Object> getJsonMap() {
+ return jsonMap;
+ }
+
+ /**
* Get {@link Policy} configuration.
*/
public Policy getPolicy() {
@@ -100,7 +160,7 @@ public class AutoScalingConfig {
} else {
triggers = new HashMap<>(trigMap.size());
for (Map.Entry<String, Object> entry : trigMap.entrySet()) {
- triggers.put(entry.getKey(), new TriggerConfig((Map<String, Object>)entry.getValue()));
+ triggers.put(entry.getKey(), new TriggerConfig(entry.getKey(), (Map<String, Object>)entry.getValue()));
}
}
}
@@ -108,24 +168,6 @@ public class AutoScalingConfig {
}
/**
- * Get listener configurations.
- */
- public Map<String, ListenerConfig> getListenerConfigs() {
- if (listeners == null) {
- Map<String, Object> map = (Map<String, Object>)jsonMap.get("listeners");
- if (map == null) {
- listeners = Collections.emptyMap();
- } else {
- listeners = new HashMap<>(map.size());
- for (Map.Entry<String, Object> entry : map.entrySet()) {
- listeners.put(entry.getKey(), new ListenerConfig((Map<String, Object>)entry.getValue()));
- }
- }
- }
- return listeners;
- }
-
- /**
* Check whether triggers for specific event type exist.
* @param types list of event types
* @return true if there's at least one trigger matching at least one event type,
@@ -144,4 +186,42 @@ public class AutoScalingConfig {
}
return false;
}
+
+ /**
+ * Get listener configurations.
+ */
+ public Map<String, TriggerListenerConfig> getTriggerListenerConfigs() {
+ if (listeners == null) {
+ Map<String, Object> map = (Map<String, Object>)jsonMap.get("listeners");
+ if (map == null) {
+ listeners = Collections.emptyMap();
+ } else {
+ listeners = new HashMap<>(map.size());
+ for (Map.Entry<String, Object> entry : map.entrySet()) {
+ listeners.put(entry.getKey(), new TriggerListenerConfig(entry.getKey(), (Map<String, Object>)entry.getValue()));
+ }
+ }
+ }
+ return listeners;
+ }
+
+ private static List<String> getList(String key, Map<String, Object> properties) {
+ return getList(key, properties, null);
+ }
+
+ private static List<String> getList(String key, Map<String, Object> properties, List<String> defaultList) {
+ if (defaultList == null) {
+ defaultList = Collections.emptyList();
+ }
+ Object o = properties.get(key);
+ if (o == null) {
+ return defaultList;
+ }
+ if (o instanceof List) {
+ return (List)o;
+ } else {
+ return Collections.singletonList(String.valueOf(o));
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9c8e829f/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingHandler.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingHandler.java
index ed29542..e730088 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingHandler.java
@@ -379,7 +379,7 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
for (String stage : stageNames) {
try {
- AutoScaling.TriggerStage.valueOf(stage);
+ AutoScaling.EventProcessorStage.valueOf(stage);
} catch (IllegalArgumentException e) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Invalid stage name: " + stage);
}
@@ -391,7 +391,7 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
// validate that we can load the listener class
// todo nocommit -- what about MemClassLoader?
try {
- container.getResourceLoader().findClass(listenerClass, AutoScaling.TriggerListener.class);
+ container.getResourceLoader().findClass(listenerClass, TriggerListener.class);
} catch (Exception e) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Listener not found: " + listenerClass, e);
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9c8e829f/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java
index 85baf71..cfd9ca3 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java
@@ -42,26 +42,9 @@ import org.slf4j.LoggerFactory;
* The cluster operations computed here are put into the {@link ActionContext}'s properties
* with the key name "operations". The value is a List of SolrRequest objects.
*/
-public class ComputePlanAction implements TriggerAction {
+public class ComputePlanAction extends TriggerActionBase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- private Map<String, String> initArgs;
-
- @Override
- public void close() throws IOException {
-
- }
-
- @Override
- public void init(Map<String, String> args) {
- this.initArgs = args;
- }
-
- @Override
- public String getName() {
- return initArgs.get("name");
- }
-
@Override
public void process(TriggerEvent event, ActionContext context) {
log.debug("-- processing event: {} with context properties: {}", event, context.getProperties());
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9c8e829f/solr/core/src/java/org/apache/solr/cloud/autoscaling/ExecutePlanAction.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ExecutePlanAction.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ExecutePlanAction.java
index 86e24f4..2cd9824 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ExecutePlanAction.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ExecutePlanAction.java
@@ -21,7 +21,6 @@ import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrResponse;
@@ -38,26 +37,9 @@ import org.slf4j.LoggerFactory;
* This class is responsible for executing cluster operations read from the {@link ActionContext}'s properties
* with the key name "operations"
*/
-public class ExecutePlanAction implements TriggerAction {
+public class ExecutePlanAction extends TriggerActionBase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- private Map<String, String> initArgs;
-
- @Override
- public void close() throws IOException {
-
- }
-
- @Override
- public void init(Map<String, String> args) {
- this.initArgs = args;
- }
-
- @Override
- public String getName() {
- return initArgs.get("name");
- }
-
@Override
public void process(TriggerEvent event, ActionContext context) {
log.debug("-- processing event: {} with context properties: {}", event, context.getProperties());
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9c8e829f/solr/core/src/java/org/apache/solr/cloud/autoscaling/HttpTriggerListener.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/HttpTriggerListener.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/HttpTriggerListener.java
new file mode 100644
index 0000000..e50417f
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/HttpTriggerListener.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud.autoscaling;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.StringJoiner;
+
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.protocol.HttpClientContext;
+import org.apache.http.entity.StringEntity;
+import org.apache.solr.client.solrj.impl.HttpClientUtil;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.core.CoreContainer;
+import org.apache.solr.util.PropertiesUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Simple HTTP callback that POSTs event data to a URL.
+ * URL, payload and headers may contain property substitution patterns, with the following properties available:
+ * <ul>
+ * <li>config.* - listener configuration</li>
+ * <li>event.* - event properties</li>
+ * <li>stage - current stage of event processing</li>
+ * <li>actionName - optional current action name</li>
+ * <li>context.* - optional {@link ActionContext} properties</li>
+ * <li>error - optional error string (from {@link Throwable#toString()})</li>
+ * <li>message - optional message</li>
+ * </ul>
+ * The following listener configuration is supported:
+ * <ul>
+ * <li>url - a URL template</li>
+ * <li>payload - string, optional payload template. If absent a JSON map of all properties listed above will be used.</li>
+ * <li>contentType - string, optional payload content type. If absent then <code>application/json</code> will be used.</li>
+ * <li>header.* - string, optional header template(s). The name of the property without "header." prefix defines the literal header name.</li>
+ * <li>timeout - int, optional connection and socket timeout in milliseconds. Default is 60 seconds.</li>
+ * <li>followRedirects - boolean, optional setting to follow redirects. Default is false.</li>
+ * </ul>
+ */
+public class HttpTriggerListener extends TriggerListenerBase {
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private HttpClient httpClient;
+ private String urlTemplate;
+ private String payloadTemplate;
+ private String contentType;
+ private Map<String, String> headerTemplates = new HashMap<>();
+ private int timeout = HttpClientUtil.DEFAULT_CONNECT_TIMEOUT;
+ private boolean followRedirects;
+
+ @Override
+ public void init(CoreContainer coreContainer, AutoScalingConfig.TriggerListenerConfig config) {
+ super.init(coreContainer, config);
+ httpClient = coreContainer.getUpdateShardHandler().getHttpClient();
+ urlTemplate = (String)config.properties.get("url");
+ payloadTemplate = (String)config.properties.get("payload");
+ contentType = (String)config.properties.get("contentType");
+ config.properties.forEach((k, v) -> {
+ if (k.startsWith("header.")) {
+ headerTemplates.put(k.substring(7), String.valueOf(v));
+ }
+ });
+ timeout = PropertiesUtil.toInteger(String.valueOf(config.properties.get("timeout")), HttpClientUtil.DEFAULT_CONNECT_TIMEOUT);
+ followRedirects = PropertiesUtil.toBoolean(String.valueOf(config.properties.get("followRedirects")));
+ }
+
+ @Override
+ public void onEvent(TriggerEvent event, AutoScaling.EventProcessorStage stage, String actionName, ActionContext context, Throwable error, String message) {
+ Properties properties = new Properties();
+ properties.setProperty("stage", stage.toString());
+ // if configuration used "actionName" but we're in a non-action related stage then PropertiesUtil will
+ // throws an exception on missing value - so replace it with an empty string
+ if (actionName == null) {
+ actionName = "";
+ }
+ properties.setProperty("actionName", actionName);
+ if (context != null) {
+ context.getProperties().forEach((k, v) -> {
+ properties.setProperty("context." + k, String.valueOf(v));
+ });
+ }
+ if (error != null) {
+ properties.setProperty("error", error.toString());
+ } else {
+ properties.setProperty("error", "");
+ }
+ if (message != null) {
+ properties.setProperty("message", message);
+ } else {
+ properties.setProperty("message", "");
+ }
+ // add event properties
+ properties.setProperty("event.id", event.getId());
+ properties.setProperty("event.source", event.getSource());
+ properties.setProperty("event.eventTime", String.valueOf(event.eventTime));
+ properties.setProperty("event.eventType", event.getEventType().toString());
+ event.getProperties().forEach((k, v) -> {
+ properties.setProperty("event.properties." + k, String.valueOf(v));
+ });
+ // add config properties
+ properties.setProperty("config.name", config.name);
+ properties.setProperty("config.trigger", config.trigger);
+ properties.setProperty("config.listenerClass", config.listenerClass);
+ properties.setProperty("config.beforeActions", String.join(",", config.beforeActions));
+ properties.setProperty("config.afterActions", String.join(",", config.afterActions));
+ StringJoiner joiner = new StringJoiner(",");
+ config.stages.forEach(s -> joiner.add(s.toString()));
+ properties.setProperty("config.stages", joiner.toString());
+ config.properties.forEach((k, v) -> {
+ properties.setProperty("config.properties." + k, String.valueOf(v));
+ });
+ String url = PropertiesUtil.substituteProperty(urlTemplate, properties);
+ String payload;
+ String type;
+ if (payloadTemplate != null) {
+ payload = PropertiesUtil.substituteProperty(payloadTemplate, properties);
+ if (contentType != null) {
+ type = contentType;
+ } else {
+ type = "application/json";
+ }
+ } else {
+ payload = Utils.toJSONString(properties);
+ type = "application/json";
+ }
+ HttpPost post = new HttpPost(url);
+ HttpEntity entity = new StringEntity(payload, "UTF-8");
+ headerTemplates.forEach((k, v) -> {
+ String headerVal = PropertiesUtil.substituteProperty(v, properties);
+ if (!headerVal.isEmpty()) {
+ post.addHeader(k, headerVal);
+ }
+ });
+ post.setEntity(entity);
+ post.setHeader("Content-Type", type);
+ org.apache.http.client.config.RequestConfig.Builder requestConfigBuilder = HttpClientUtil.createDefaultRequestConfigBuilder();
+ requestConfigBuilder.setSocketTimeout(timeout);
+ requestConfigBuilder.setConnectTimeout(timeout);
+ requestConfigBuilder.setRedirectsEnabled(followRedirects);
+
+ post.setConfig(requestConfigBuilder.build());
+ try {
+ HttpClientContext httpClientRequestContext = HttpClientUtil.createNewHttpClientRequestContext();
+ HttpResponse rsp = httpClient.execute(post, httpClientRequestContext);
+ int statusCode = rsp.getStatusLine().getStatusCode();
+ if (statusCode != 200) {
+ LOG.warn("Error sending request for event " + event + ", HTTP response: " + rsp.toString());
+ }
+ HttpEntity responseEntity = rsp.getEntity();
+ Utils.consumeFully(responseEntity);
+ } catch (IOException e) {
+ LOG.warn("Exception sending request for event " + event, e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9c8e829f/solr/core/src/java/org/apache/solr/cloud/autoscaling/LogPlanAction.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/LogPlanAction.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/LogPlanAction.java
index 7b2de80..45107c1 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/LogPlanAction.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/LogPlanAction.java
@@ -17,28 +17,10 @@
package org.apache.solr.cloud.autoscaling;
-import java.io.IOException;
-import java.util.Map;
-
/**
* todo nocommit
*/
-public class LogPlanAction implements TriggerAction {
- @Override
- public void close() throws IOException {
-
- }
-
- @Override
- public void init(Map<String, String> args) {
-
- }
-
- @Override
- public String getName() {
- return null;
- }
-
+public class LogPlanAction extends TriggerActionBase {
@Override
public void process(TriggerEvent event, ActionContext actionContext) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9c8e829f/solr/core/src/java/org/apache/solr/cloud/autoscaling/LogTriggerListener.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/LogTriggerListener.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/LogTriggerListener.java
new file mode 100644
index 0000000..108f41b
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/LogTriggerListener.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud.autoscaling;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+
+import org.apache.solr.core.CoreContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of {@link TriggerListener} that reports
+ * events to a log.
+ */
+public class LogTriggerListener extends TriggerListenerBase {
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ @Override
+ public void onEvent(TriggerEvent event, AutoScaling.EventProcessorStage stage, String actionName, ActionContext context,
+ Throwable error, String message) {
+ LOG.info("{}: stage={}, actionName={}, event={}, error={}, messsage={}", config.name, stage, actionName, event, error, message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9c8e829f/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java
index 4162ef2..c51e586 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java
@@ -52,7 +52,7 @@ public class NodeAddedTrigger extends TriggerBase {
private final Map<String, Object> properties;
private final CoreContainer container;
private final List<TriggerAction> actions;
- private final AtomicReference<AutoScaling.TriggerListener> listenerRef;
+ private final AtomicReference<AutoScaling.EventProcessor> processorRef;
private final boolean enabled;
private final int waitForSecond;
private final AutoScaling.EventType eventType;
@@ -71,7 +71,7 @@ public class NodeAddedTrigger extends TriggerBase {
this.properties = properties;
this.container = container;
this.timeSource = TimeSource.CURRENT_TIME;
- this.listenerRef = new AtomicReference<>();
+ this.processorRef = new AtomicReference<>();
List<Map<String, String>> o = (List<Map<String, String>>) properties.get("actions");
if (o != null && !o.isEmpty()) {
actions = new ArrayList<>(3);
@@ -119,13 +119,13 @@ public class NodeAddedTrigger extends TriggerBase {
}
@Override
- public void setListener(AutoScaling.TriggerListener listener) {
- listenerRef.set(listener);
+ public void setProcessor(AutoScaling.EventProcessor processor) {
+ processorRef.set(processor);
}
@Override
- public AutoScaling.TriggerListener getListener() {
- return listenerRef.get();
+ public AutoScaling.EventProcessor getProcessor() {
+ return processorRef.get();
}
@Override
@@ -254,10 +254,10 @@ public class NodeAddedTrigger extends TriggerBase {
long now = timeSource.getTime();
if (TimeUnit.SECONDS.convert(now - timeAdded, TimeUnit.NANOSECONDS) >= getWaitForSecond()) {
// fire!
- AutoScaling.TriggerListener listener = listenerRef.get();
- if (listener != null) {
- log.debug("NodeAddedTrigger {} firing registered listener for node: {} added at time {} , now: {}", name, nodeName, timeAdded, now);
- if (listener.triggerFired(new NodeAddedEvent(getEventType(), getName(), timeAdded, nodeName))) {
+ AutoScaling.EventProcessor processor = processorRef.get();
+ if (processor != null) {
+ log.debug("NodeAddedTrigger {} firing registered processor for node: {} added at time {} , now: {}", name, nodeName, timeAdded, now);
+ if (processor.process(new NodeAddedEvent(getEventType(), getName(), timeAdded, nodeName))) {
// remove from tracking set only if the fire was accepted
it.remove();
removeNodeAddedMarker(nodeName);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9c8e829f/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
index bd9b745..caf051b 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
@@ -52,7 +52,7 @@ public class NodeLostTrigger extends TriggerBase {
private final Map<String, Object> properties;
private final CoreContainer container;
private final List<TriggerAction> actions;
- private final AtomicReference<AutoScaling.TriggerListener> listenerRef;
+ private final AtomicReference<AutoScaling.EventProcessor> processorRef;
private final boolean enabled;
private final int waitForSecond;
private final AutoScaling.EventType eventType;
@@ -71,7 +71,7 @@ public class NodeLostTrigger extends TriggerBase {
this.properties = properties;
this.container = container;
this.timeSource = TimeSource.CURRENT_TIME;
- this.listenerRef = new AtomicReference<>();
+ this.processorRef = new AtomicReference<>();
List<Map<String, String>> o = (List<Map<String, String>>) properties.get("actions");
if (o != null && !o.isEmpty()) {
actions = new ArrayList<>(3);
@@ -117,13 +117,13 @@ public class NodeLostTrigger extends TriggerBase {
}
@Override
- public void setListener(AutoScaling.TriggerListener listener) {
- listenerRef.set(listener);
+ public void setProcessor(AutoScaling.EventProcessor processor) {
+ processorRef.set(processor);
}
@Override
- public AutoScaling.TriggerListener getListener() {
- return listenerRef.get();
+ public AutoScaling.EventProcessor getProcessor() {
+ return processorRef.get();
}
@Override
@@ -249,10 +249,10 @@ public class NodeLostTrigger extends TriggerBase {
Long timeRemoved = entry.getValue();
if (TimeUnit.SECONDS.convert(timeSource.getTime() - timeRemoved, TimeUnit.NANOSECONDS) >= getWaitForSecond()) {
// fire!
- AutoScaling.TriggerListener listener = listenerRef.get();
- if (listener != null) {
- log.debug("NodeLostTrigger firing registered listener for lost node: {}", nodeName);
- if (listener.triggerFired(new NodeLostEvent(getEventType(), getName(), timeRemoved, nodeName))) {
+ AutoScaling.EventProcessor processor = processorRef.get();
+ if (processor != null) {
+ log.debug("NodeLostTrigger firing registered processor for lost node: {}", nodeName);
+ if (processor.process(new NodeLostEvent(getEventType(), getName(), timeRemoved, nodeName))) {
it.remove();
removeNodeLostMarker(nodeName);
} else {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9c8e829f/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
index d7fc47e..3666e1b 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
@@ -37,6 +37,7 @@ import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.cloud.ZooKeeperException;
import org.apache.solr.common.params.AutoScalingParams;
import org.apache.solr.common.util.IOUtils;
+import org.apache.solr.common.util.Utils;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
@@ -75,6 +76,8 @@ public class OverseerTriggerThread implements Runnable, Closeable {
private boolean isClosed = false;
+ private AutoScalingConfig autoScalingConfig;
+
public OverseerTriggerThread(ZkController zkController) {
this.zkController = zkController;
zkStateReader = zkController.getZkStateReader();
@@ -163,6 +166,9 @@ public class OverseerTriggerThread implements Runnable, Closeable {
break;
}
+ // update the current config
+ scheduledTriggers.setAutoScalingConfig(autoScalingConfig);
+
Set<String> managedTriggerNames = scheduledTriggers.getScheduledTriggerNames();
// remove the triggers which are no longer active
for (String managedTriggerName : managedTriggerNames) {
@@ -265,8 +271,9 @@ public class OverseerTriggerThread implements Runnable, Closeable {
// protect against reordered watcher fires by ensuring that we only move forward
return;
}
+ autoScalingConfig = new AutoScalingConfig(data);
znodeVersion = stat.getVersion();
- Map<String, AutoScaling.Trigger> triggerMap = loadTriggers(triggerFactory, data);
+ Map<String, AutoScaling.Trigger> triggerMap = loadTriggers(triggerFactory, autoScalingConfig);
// remove all active triggers that have been removed from ZK
Set<String> trackingKeySet = activeTriggers.keySet();
@@ -288,22 +295,19 @@ public class OverseerTriggerThread implements Runnable, Closeable {
}
}
- private static Map<String, AutoScaling.Trigger> loadTriggers(AutoScaling.TriggerFactory triggerFactory, byte[] data) {
- ZkNodeProps loaded = ZkNodeProps.load(data);
- Map<String, Object> triggers = (Map<String, Object>) loaded.get("triggers");
-
+ private static Map<String, AutoScaling.Trigger> loadTriggers(AutoScaling.TriggerFactory triggerFactory, AutoScalingConfig autoScalingConfig) {
+ Map<String, AutoScalingConfig.TriggerConfig> triggers = autoScalingConfig.getTriggerConfigs();
if (triggers == null) {
return Collections.emptyMap();
}
Map<String, AutoScaling.Trigger> triggerMap = new HashMap<>(triggers.size());
- for (Map.Entry<String, Object> entry : triggers.entrySet()) {
- Map<String, Object> props = (Map<String, Object>) entry.getValue();
- String event = (String) props.get(AutoScalingParams.EVENT);
- AutoScaling.EventType eventType = AutoScaling.EventType.valueOf(event.toUpperCase(Locale.ROOT));
+ for (Map.Entry<String, AutoScalingConfig.TriggerConfig> entry : triggers.entrySet()) {
+ AutoScalingConfig.TriggerConfig cfg = entry.getValue();
+ AutoScaling.EventType eventType = cfg.eventType;
String triggerName = entry.getKey();
- triggerMap.put(triggerName, triggerFactory.create(eventType, triggerName, props));
+ triggerMap.put(triggerName, triggerFactory.create(eventType, triggerName, cfg.properties));
}
return triggerMap;
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9c8e829f/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
index a15b2d1..4b7c0d0 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
@@ -24,7 +24,9 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -34,6 +36,8 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.solr.cloud.ActionThrottle;
@@ -85,6 +89,10 @@ public class ScheduledTriggers implements Closeable {
private final CoreContainer coreContainer;
+ private final TriggerListeners listeners;
+
+ private AutoScalingConfig autoScalingConfig;
+
public ScheduledTriggers(ZkController zkController) {
// todo make the core pool size configurable
// it is important to use more than one because a time taking trigger can starve other scheduled triggers
@@ -98,9 +106,20 @@ public class ScheduledTriggers implements Closeable {
actionExecutor = ExecutorUtil.newMDCAwareSingleThreadExecutor(new DefaultSolrThreadFactory("AutoscalingActionExecutor"));
// todo make the wait time configurable
actionThrottle = new ActionThrottle("action", DEFAULT_MIN_MS_BETWEEN_ACTIONS);
- this.coreContainer = zkController.getCoreContainer();
- this.zkClient = zkController.getZkClient();
+ coreContainer = zkController.getCoreContainer();
+ zkClient = zkController.getZkClient();
queueStats = new Overseer.Stats();
+ listeners = new TriggerListeners();
+ }
+
+ /**
+ * Set the current autoscaling config. This is invoked by {@link OverseerTriggerThread} when autoscaling.json is updated,
+ * and it re-initializes trigger listeners.
+ * @param autoScalingConfig current autoscaling.json
+ */
+ public void setAutoScalingConfig(AutoScalingConfig autoScalingConfig) {
+ this.autoScalingConfig = autoScalingConfig;
+ listeners.setAutoScalingConfig(autoScalingConfig);
}
/**
@@ -127,20 +146,25 @@ public class ScheduledTriggers implements Closeable {
scheduledTrigger.setReplay(false);
scheduledTriggers.replace(newTrigger.getName(), scheduledTrigger);
}
- newTrigger.setListener(event -> {
+ newTrigger.setProcessor(event -> {
ScheduledTrigger scheduledSource = scheduledTriggers.get(event.getSource());
if (scheduledSource == null) {
- log.warn("Ignoring autoscaling event " + event + " because the source trigger: " + event.getSource() + " doesn't exist.");
+ String msg = String.format(Locale.ROOT, "Ignoring autoscaling event %s because the source trigger: %s doesn't exist.", event.toString(), event.getSource());
+ listeners.fireListeners(event.getSource(), event, AutoScaling.EventProcessorStage.FAILED, msg);
+ log.warn(msg);
return false;
}
boolean replaying = event.getProperty(TriggerEvent.REPLAYING) != null ? (Boolean)event.getProperty(TriggerEvent.REPLAYING) : false;
AutoScaling.Trigger source = scheduledSource.trigger;
if (source.isClosed()) {
- log.warn("Ignoring autoscaling event " + event + " because the source trigger: " + source + " has already been closed");
+ String msg = String.format(Locale.ROOT, "Ignoring autoscaling event %s because the source trigger: %s has already been closed", event.toString(), source);
+ listeners.fireListeners(event.getSource(), event, AutoScaling.EventProcessorStage.ABORTED, msg);
+ log.warn(msg);
// we do not want to lose this event just because the trigger was closed, perhaps a replacement will need it
return false;
}
if (hasPendingActions.compareAndSet(false, true)) {
+ listeners.fireListeners(event.getSource(), event, AutoScaling.EventProcessorStage.STARTED);
final boolean enqueued;
if (replaying) {
enqueued = false;
@@ -158,17 +182,21 @@ public class ScheduledTriggers implements Closeable {
actionThrottle.markAttemptingAction();
ActionContext actionContext = new ActionContext(coreContainer, newTrigger, new HashMap<>());
for (TriggerAction action : actions) {
+ listeners.fireListeners(event.getSource(), event, AutoScaling.EventProcessorStage.BEFORE_ACTION, action.getName(), actionContext);
try {
action.process(event, actionContext);
} catch (Exception e) {
+ listeners.fireListeners(event.getSource(), event, AutoScaling.EventProcessorStage.FAILED, action.getName(), actionContext, e, null);
log.error("Error executing action: " + action.getName() + " for trigger event: " + event, e);
throw e;
}
+ listeners.fireListeners(event.getSource(), event, AutoScaling.EventProcessorStage.AFTER_ACTION, action.getName(), actionContext);
}
if (enqueued) {
TriggerEvent ev = scheduledTrigger.dequeue();
assert ev.getId().equals(event.getId());
}
+ listeners.fireListeners(event.getSource(), event, AutoScaling.EventProcessorStage.SUCCEEDED);
} finally {
hasPendingActions.set(false);
}
@@ -181,6 +209,7 @@ public class ScheduledTriggers implements Closeable {
+ " is broken! Expected event=" + event + " but got " + ev);
}
}
+ listeners.fireListeners(event.getSource(), event, AutoScaling.EventProcessorStage.SUCCEEDED);
hasPendingActions.set(false);
}
return true;
@@ -252,6 +281,7 @@ public class ScheduledTriggers implements Closeable {
// guarantee about cluster state
scheduledThreadPoolExecutor.shutdownNow();
actionExecutor.shutdownNow();
+ listeners.close();
}
private class ScheduledTrigger implements Runnable, Closeable {
@@ -303,7 +333,7 @@ public class ScheduledTriggers implements Closeable {
while ((event = queue.peekEvent()) != null) {
// override REPLAYING=true
event.getProperties().put(TriggerEvent.REPLAYING, true);
- if (! trigger.getListener().triggerFired(event)) {
+ if (! trigger.getProcessor().process(event)) {
log.error("Failed to re-play event, discarding: " + event);
}
queue.pollEvent(); // always remove it from queue
@@ -339,4 +369,170 @@ public class ScheduledTriggers implements Closeable {
IOUtils.closeQuietly(trigger);
}
}
+
+ private class TriggerListeners {
+ Map<String, Map<AutoScaling.EventProcessorStage, List<TriggerListener>>> listenersPerStage = new HashMap<>();
+ Map<String, TriggerListener> listenersPerName = new HashMap<>();
+ ReentrantLock updateLock = new ReentrantLock();
+
+ void setAutoScalingConfig(AutoScalingConfig autoScalingConfig) {
+ updateLock.lock();
+ // we will recreate this from scratch
+ listenersPerStage.clear();
+ try {
+ Set<String> triggerNames = autoScalingConfig.getTriggerConfigs().keySet();
+ Map<String, AutoScalingConfig.TriggerListenerConfig> configs = autoScalingConfig.getTriggerListenerConfigs();
+ Set<String> listenerNames = configs.entrySet().stream().map(entry -> entry.getValue().name).collect(Collectors.toSet());
+ // close those for non-existent triggers and nonexistent listener configs
+ for (Iterator<Map.Entry<String, TriggerListener>> it = listenersPerName.entrySet().iterator(); it.hasNext(); ) {
+ Map.Entry<String, TriggerListener> entry = it.next();
+ String name = entry.getKey();
+ TriggerListener listener = entry.getValue();
+ if (!triggerNames.contains(listener.getConfig().trigger) || !listenerNames.contains(name)) {
+ try {
+ listener.close();
+ } catch (Exception e) {
+ log.warn("Exception closing old listener " + listener.getConfig(), e);
+ }
+ it.remove();
+ }
+ }
+ for (Map.Entry<String, AutoScalingConfig.TriggerListenerConfig> entry : configs.entrySet()) {
+ AutoScalingConfig.TriggerListenerConfig config = entry.getValue();
+ if (!triggerNames.contains(config.trigger)) {
+ log.debug("-- skipping listener for non-existent trigger: {}", config);
+ continue;
+ }
+ // find previous instance and reuse if possible
+ TriggerListener oldListener = listenersPerName.get(config.name);
+ TriggerListener listener = null;
+ if (oldListener != null) {
+ if (!oldListener.getConfig().equals(config)) { // changed config
+ try {
+ oldListener.close();
+ } catch (Exception e) {
+ log.warn("Exception closing old listener " + listener.getConfig(), e);
+ }
+ } else {
+ listener = oldListener; // reuse
+ }
+ }
+ if (listener == null) { // create new instance
+ String clazz = config.listenerClass;
+ try {
+ listener = coreContainer.getResourceLoader().newInstance(clazz, TriggerListener.class);
+ } catch (Exception e) {
+ log.warn("Invalid TriggerListener class name '" + clazz + "', skipping...", e);
+ }
+ try {
+ listener.init(coreContainer, config);
+ listenersPerName.put(config.name, listener);
+ } catch (Exception e) {
+ log.warn("Error initializing TriggerListener " + config, e);
+ try {
+ listener.close();
+ } catch (Exception e1) {
+ // ignore
+ }
+ listener = null;
+ }
+ }
+ if (listener == null) {
+ continue;
+ }
+ // add per stage
+ for (AutoScaling.EventProcessorStage stage : config.stages) {
+ addPerStage(config.trigger, stage, listener);
+ }
+ // add also for beforeAction / afterAction TriggerStage
+ if (!config.beforeActions.isEmpty()) {
+ addPerStage(config.trigger, AutoScaling.EventProcessorStage.BEFORE_ACTION, listener);
+ }
+ if (!config.afterActions.isEmpty()) {
+ addPerStage(config.trigger, AutoScaling.EventProcessorStage.AFTER_ACTION, listener);
+ }
+ }
+ } finally {
+ updateLock.unlock();
+ }
+ }
+
+ private void addPerStage(String triggerName, AutoScaling.EventProcessorStage stage, TriggerListener listener) {
+ Map<AutoScaling.EventProcessorStage, List<TriggerListener>> perStage =
+ listenersPerStage.computeIfAbsent(triggerName, k -> new HashMap<>());
+ List<TriggerListener> lst = perStage.computeIfAbsent(stage, k -> new ArrayList<>(3));
+ lst.add(listener);
+ }
+
+ void reset() {
+ updateLock.lock();
+ try {
+ listenersPerStage.clear();
+ for (TriggerListener listener : listenersPerName.values()) {
+ IOUtils.closeQuietly(listener);
+ }
+ listenersPerName.clear();
+ } finally {
+ updateLock.unlock();
+ }
+ }
+
+ void close() {
+ reset();
+ }
+
+ List<TriggerListener> getTriggerListeners(String trigger, AutoScaling.EventProcessorStage stage) {
+ Map<AutoScaling.EventProcessorStage, List<TriggerListener>> perStage = listenersPerStage.get(trigger);
+ if (perStage == null) {
+ return Collections.emptyList();
+ }
+ List<TriggerListener> lst = perStage.get(stage);
+ if (lst == null) {
+ return Collections.emptyList();
+ } else {
+ return Collections.unmodifiableList(lst);
+ }
+ }
+
+ void fireListeners(String trigger, TriggerEvent event, AutoScaling.EventProcessorStage stage) {
+ fireListeners(trigger, event, stage, null, null, null, null);
+ }
+
+ void fireListeners(String trigger, TriggerEvent event, AutoScaling.EventProcessorStage stage, String message) {
+ fireListeners(trigger, event, stage, null, null, null, message);
+ }
+
+ void fireListeners(String trigger, TriggerEvent event, AutoScaling.EventProcessorStage stage, String actionName,
+ ActionContext context) {
+ fireListeners(trigger, event, stage, actionName, context, null, null);
+ }
+
+ void fireListeners(String trigger, TriggerEvent event, AutoScaling.EventProcessorStage stage, String actionName,
+ ActionContext context, Throwable error, String message) {
+ updateLock.lock();
+ try {
+ for (TriggerListener listener : getTriggerListeners(trigger, stage)) {
+ if (actionName != null) {
+ AutoScalingConfig.TriggerListenerConfig config = listener.getConfig();
+ if (stage == AutoScaling.EventProcessorStage.BEFORE_ACTION) {
+ if (!config.beforeActions.contains(actionName)) {
+ continue;
+ }
+ } else if (stage == AutoScaling.EventProcessorStage.AFTER_ACTION) {
+ if (!config.afterActions.contains(actionName)) {
+ continue;
+ }
+ }
+ }
+ try {
+ listener.onEvent(event, stage, actionName, context, error, message);
+ } catch (Exception e) {
+ log.warn("Exception running listener " + listener.getConfig(), e);
+ }
+ }
+ } finally {
+ updateLock.unlock();
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9c8e829f/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerActionBase.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerActionBase.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerActionBase.java
new file mode 100644
index 0000000..75c4a87
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerActionBase.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud.autoscaling;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Base class for {@link TriggerAction} implementations.
+ */
+public abstract class TriggerActionBase implements TriggerAction {
+
+ protected Map<String, String> initArgs;
+
+ @Override
+ public String getName() {
+ if (initArgs != null) {
+ return initArgs.get("name");
+ } else {
+ return getClass().getSimpleName();
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+
+ @Override
+ public void init(Map<String, String> args) {
+ this.initArgs = args;
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9c8e829f/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerListener.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerListener.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerListener.java
new file mode 100644
index 0000000..479de49
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerListener.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud.autoscaling;
+
+import java.io.Closeable;
+
+import org.apache.solr.core.CoreContainer;
+
+/**
+ * Implementations of this interface are notified of stages in event processing that they were
+ * registered for. Note: instances may be closed and re-created on each auto-scaling config update.
+ */
+public interface TriggerListener extends Closeable {
+
+ void init(CoreContainer coreContainer, AutoScalingConfig.TriggerListenerConfig config) throws Exception;
+
+ AutoScalingConfig.TriggerListenerConfig getConfig();
+
+ /**
+ * This method is called when either a particular <code>stage</code> or
+ * <code>actionName</code> is reached during event processing.
+ * @param event current event being processed
+ * @param stage {@link AutoScaling.EventProcessorStage} that this listener was registered for, or null
+ * @param actionName {@link TriggerAction} name that this listener was registered for, or null
+ * @param context optional {@link ActionContext} when the processing stage is related to an action, or null
+ * @param error optional {@link Throwable} error, or null
+ * @param message optional message
+ */
+ void onEvent(TriggerEvent event, AutoScaling.EventProcessorStage stage, String actionName, ActionContext context,
+ Throwable error, String message) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9c8e829f/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerListenerBase.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerListenerBase.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerListenerBase.java
new file mode 100644
index 0000000..1cefa0e
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerListenerBase.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud.autoscaling;
+
+import java.io.IOException;
+
+import org.apache.solr.core.CoreContainer;
+
+/**
+ * Base class for implementations of {@link TriggerListener}.
+ */
+public abstract class TriggerListenerBase implements TriggerListener {
+
+ protected AutoScalingConfig.TriggerListenerConfig config;
+ protected CoreContainer coreContainer;
+
+ @Override
+ public void init(CoreContainer coreContainer, AutoScalingConfig.TriggerListenerConfig config) {
+ this.coreContainer = coreContainer;
+ this.config = config;
+ }
+
+ @Override
+ public AutoScalingConfig.TriggerListenerConfig getConfig() {
+ return config;
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9c8e829f/solr/core/src/test/org/apache/solr/cloud/autoscaling/AutoScalingHandlerTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/AutoScalingHandlerTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/AutoScalingHandlerTest.java
index df53b7b..fb000b9 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/AutoScalingHandlerTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/AutoScalingHandlerTest.java
@@ -356,7 +356,7 @@ public class AutoScalingHandlerTest extends SolrCloudTestCase {
"'trigger' : 'node_lost_trigger'," +
"'stage' : ['STARTED','ABORTED','SUCCEEDED']," +
"'beforeAction' : 'execute_plan'," +
- "'class' : 'org.apache.solr.cloud.autoscaling.AutoScaling$HttpCallbackListener'," +
+ "'class' : 'org.apache.solr.cloud.autoscaling.HttpTriggerListener'," +
"'url' : 'http://xyz.com/on_node_lost?node={$LOST_NODE_NAME}'" +
"}" +
"}";
@@ -371,7 +371,7 @@ public class AutoScalingHandlerTest extends SolrCloudTestCase {
assertTrue(listeners.containsKey("xyz"));
Map<String, Object> xyzListener = (Map<String, Object>) listeners.get("xyz");
assertEquals(5, xyzListener.size());
- assertEquals("org.apache.solr.cloud.autoscaling.AutoScaling$HttpCallbackListener", xyzListener.get("class").toString());
+ assertEquals("org.apache.solr.cloud.autoscaling.HttpTriggerListener", xyzListener.get("class").toString());
String removeTriggerCommand = "{" +
"'remove-trigger' : {" +
@@ -422,7 +422,7 @@ public class AutoScalingHandlerTest extends SolrCloudTestCase {
"'trigger' : 'node_lost_trigger'," +
"'stage' : ['STARTED','ABORTED','SUCCEEDED']," +
"'beforeAction' : 'execute_plan'," +
- "'class' : 'org.apache.solr.cloud.autoscaling.AutoScaling$HttpCallbackListener'," +
+ "'class' : 'org.apache.solr.cloud.autoscaling.AutoScaling$HttpTriggerListener'," +
"'url' : 'http://xyz.com/on_node_lost?node={$LOST_NODE_NAME}'" +
"}" +
"}";
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9c8e829f/solr/core/src/test/org/apache/solr/cloud/autoscaling/HttpTriggerListenerTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/HttpTriggerListenerTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/HttpTriggerListenerTest.java
new file mode 100644
index 0000000..528162e
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/HttpTriggerListenerTest.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud.autoscaling;
+
+import javax.servlet.ServletException;
+import javax.servlet.ServletInputStream;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.util.NamedList;
+import org.apache.solr.util.LogLevel;
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.handler.AbstractHandler;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.apache.solr.cloud.autoscaling.AutoScalingHandlerTest.createAutoScalingRequest;
+
+/**
+ *
+ */
+@LogLevel("org.apache.solr.cloud.autoscaling=DEBUG")
+@SolrTestCaseJ4.SuppressSSL
+public class HttpTriggerListenerTest extends SolrCloudTestCase {
+
+ private static CountDownLatch triggerFiredLatch;
+
+ private MockService mockService;
+
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ configureCluster(2)
+ .addConfig("conf", configset("cloud-minimal"))
+ .configure();
+ }
+
+ @Before
+ public void setupTest() throws Exception {
+ mockService = new MockService();
+ mockService.start();
+ triggerFiredLatch = new CountDownLatch(1);
+ }
+
+ @After
+ public void teardownTest() throws Exception {
+ if (mockService != null) {
+ mockService.close();
+ }
+ }
+
+ @Test
+ public void testHttpListenerIntegration() throws Exception {
+ CloudSolrClient solrClient = cluster.getSolrClient();
+ String setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'node_added_trigger'," +
+ "'event' : 'nodeAdded'," +
+ "'waitFor' : '0s'," +
+ "'enabled' : true," +
+ "'actions' : [" +
+ "{'name':'test','class':'" + TestDummyAction.class.getName() + "'}" +
+ "]" +
+ "}}";
+ SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+ NamedList<Object> response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ String setListenerCommand = "{" +
+ "'set-listener' : " +
+ "{" +
+ "'name' : 'foo'," +
+ "'trigger' : 'node_added_trigger'," +
+ "'stage' : ['WAITING', 'STARTED','ABORTED','SUCCEEDED', 'FAILED']," +
+ "'beforeAction' : 'test'," +
+ "'afterAction' : ['test']," +
+ "'class' : '" + HttpTriggerListener.class.getName() + "'," +
+ "'url' : '" + mockService.server.getURI().toString() + "/${config.name:invalid}/${config.properties.xyz:invalid}/${stage}'," +
+ "'payload': 'actionName=${actionName}, source=${event.source}, type=${event.eventType}'," +
+ "'header.X-Foo' : '${config.name:invalid}'," +
+ "'xyz': 'foo'" +
+ "}" +
+ "}";
+ req = createAutoScalingRequest(SolrRequest.METHOD.POST, setListenerCommand);
+ response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ assertEquals(requests.toString(), 0, requests.size());
+
+ cluster.startJettySolrRunner();
+ boolean await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
+ assertTrue("The trigger did not fire at all", await);
+
+ Thread.sleep(5000);
+
+ assertEquals(requests.toString(), 4, requests.size());
+ requests.forEach(s -> assertTrue(s.contains("Content-Type: application/json")));
+ requests.forEach(s -> assertTrue(s.contains("X-Foo: foo")));
+ requests.forEach(s -> assertTrue(s.contains("source=node_added_trigger")));
+ requests.forEach(s -> assertTrue(s.contains("type=NODEADDED")));
+
+ String request = requests.get(0);
+ assertTrue(request, request.startsWith("/foo/foo/STARTED"));
+ assertTrue(request, request.contains("actionName=,")); // empty actionName
+
+ request = requests.get(1);
+ assertTrue(request, request.startsWith("/foo/foo/BEFORE_ACTION"));
+ assertTrue(request, request.contains("actionName=test,")); // actionName
+
+ request = requests.get(2);
+ assertTrue(request, request.startsWith("/foo/foo/AFTER_ACTION"));
+ assertTrue(request, request.contains("actionName=test,")); // actionName
+
+ request = requests.get(3);
+ assertTrue(request, request.startsWith("/foo/foo/SUCCEEDED"));
+ assertTrue(request, request.contains("actionName=,")); // empty actionName
+ }
+
+ public static class TestDummyAction extends TriggerActionBase {
+
+ @Override
+ public void process(TriggerEvent event, ActionContext context) {
+ triggerFiredLatch.countDown();
+ }
+ }
+
+
+ static List<String> requests = new ArrayList<>();
+
+ private static class MockService extends Thread {
+ Server server;
+
+ public void start() {
+ server = new Server(new InetSocketAddress("localhost", 0));
+ server.setHandler(new AbstractHandler() {
+ @Override
+ public void handle(String s, Request request, HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse) throws IOException, ServletException {
+ StringBuilder stringBuilder = new StringBuilder();
+ stringBuilder.append(httpServletRequest.getRequestURI());
+ Enumeration<String> headerNames = httpServletRequest.getHeaderNames();
+ while (headerNames.hasMoreElements()) {
+ stringBuilder.append('\n');
+ String name = headerNames.nextElement();
+ stringBuilder.append(name);
+ stringBuilder.append(": ");
+ stringBuilder.append(httpServletRequest.getHeader(name));
+ }
+ stringBuilder.append("\n\n");
+ ServletInputStream is = request.getInputStream();
+ byte[] httpInData = new byte[request.getContentLength()];
+ int len = -1;
+ while ((len = is.read(httpInData)) != -1) {
+ stringBuilder.append(new String(httpInData, 0, len, "UTF-8"));
+ }
+ requests.add(stringBuilder.toString());
+ httpServletResponse.setStatus(HttpServletResponse.SC_OK);
+ request.setHandled(true);
+ }
+ });
+ try {
+ server.start();
+ for (int i = 0; i < 30; i++) {
+ Thread.sleep(1000);
+ if (server.isRunning()) {
+ break;
+ }
+ if (server.isFailed()) {
+ throw new Exception("MockService startup failed - the test will fail...");
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Exception starting MockService", e);
+ }
+ }
+
+ void close() throws Exception {
+ if (server != null) {
+ server.stop();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9c8e829f/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeAddedTriggerTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeAddedTriggerTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeAddedTriggerTest.java
index 1a43bd3..a2beed4 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeAddedTriggerTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeAddedTriggerTest.java
@@ -43,7 +43,7 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase {
private static AtomicBoolean actionInitCalled = new AtomicBoolean(false);
private static AtomicBoolean actionCloseCalled = new AtomicBoolean(false);
- private AutoScaling.TriggerListener noFirstRunListener = event -> {
+ private AutoScaling.EventProcessor noFirstRunProcessor = event -> {
fail("Did not expect the listener to fire on first run!");
return true;
};
@@ -73,13 +73,13 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase {
Map<String, Object> props = createTriggerProps(waitForSeconds);
try (NodeAddedTrigger trigger = new NodeAddedTrigger("node_added_trigger", props, container)) {
- trigger.setListener(noFirstRunListener);
+ trigger.setProcessor(noFirstRunProcessor);
trigger.run();
JettySolrRunner newNode = cluster.startJettySolrRunner();
AtomicBoolean fired = new AtomicBoolean(false);
AtomicReference<TriggerEvent> eventRef = new AtomicReference<>();
- trigger.setListener(event -> {
+ trigger.setProcessor(event -> {
if (fired.compareAndSet(false, true)) {
eventRef.set(event);
long currentTimeNanos = timeSource.getTime();
@@ -112,12 +112,12 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase {
try (NodeAddedTrigger trigger = new NodeAddedTrigger("node_added_trigger", props, container)) {
final long waitTime = 2;
props.put("waitFor", waitTime);
- trigger.setListener(noFirstRunListener);
+ trigger.setProcessor(noFirstRunProcessor);
trigger.run();
JettySolrRunner newNode = cluster.startJettySolrRunner();
AtomicBoolean fired = new AtomicBoolean(false);
- trigger.setListener(event -> {
+ trigger.setProcessor(event -> {
if (fired.compareAndSet(false, true)) {
long currentTimeNanos = timeSource.getTime();
long eventTimeNanos = event.getEventTime();
@@ -196,14 +196,14 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase {
CoreContainer container = cluster.getJettySolrRunners().get(0).getCoreContainer();
Map<String, Object> props = createTriggerProps(0);
try (NodeAddedTrigger trigger = new NodeAddedTrigger("node_added_trigger", props, container)) {
- trigger.setListener(noFirstRunListener);
+ trigger.setProcessor(noFirstRunProcessor);
trigger.run(); // starts tracking live nodes
JettySolrRunner newNode = cluster.startJettySolrRunner();
AtomicInteger callCount = new AtomicInteger(0);
AtomicBoolean fired = new AtomicBoolean(false);
- trigger.setListener(event -> {
+ trigger.setProcessor(event -> {
if (callCount.incrementAndGet() < 2) {
return false;
} else {
@@ -232,7 +232,7 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase {
// add a new node but update the trigger before the waitFor period expires
// and assert that the new trigger still fires
NodeAddedTrigger trigger = new NodeAddedTrigger("node_added_trigger", props, container);
- trigger.setListener(noFirstRunListener);
+ trigger.setProcessor(noFirstRunProcessor);
trigger.run();
JettySolrRunner newNode = cluster.startJettySolrRunner();
@@ -251,7 +251,7 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase {
try (NodeAddedTrigger newTrigger = new NodeAddedTrigger("node_added_trigger", props, container)) {
AtomicBoolean fired = new AtomicBoolean(false);
AtomicReference<TriggerEvent> eventRef = new AtomicReference<>();
- newTrigger.setListener(event -> {
+ newTrigger.setProcessor(event -> {
if (fired.compareAndSet(false, true)) {
eventRef.set(event);
long currentTimeNanos = timeSource.getTime();
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/9c8e829f/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeLostTriggerTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeLostTriggerTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeLostTriggerTest.java
index 82e1326..2492ce2 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeLostTriggerTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeLostTriggerTest.java
@@ -43,7 +43,7 @@ public class NodeLostTriggerTest extends SolrCloudTestCase {
private static AtomicBoolean actionInitCalled = new AtomicBoolean(false);
private static AtomicBoolean actionCloseCalled = new AtomicBoolean(false);
- private AutoScaling.TriggerListener noFirstRunListener = event -> {
+ private AutoScaling.EventProcessor noFirstRunProcessor = event -> {
fail("Did not expect the listener to fire on first run!");
return true;
};
@@ -74,14 +74,14 @@ public class NodeLostTriggerTest extends SolrCloudTestCase {
Map<String, Object> props = createTriggerProps(waitForSeconds);
try (NodeLostTrigger trigger = new NodeLostTrigger("node_lost_trigger", props, container)) {
- trigger.setListener(noFirstRunListener);
+ trigger.setProcessor(noFirstRunProcessor);
trigger.run();
String lostNodeName = cluster.getJettySolrRunner(1).getNodeName();
cluster.stopJettySolrRunner(1);
AtomicBoolean fired = new AtomicBoolean(false);
AtomicReference<TriggerEvent> eventRef = new AtomicReference<>();
- trigger.setListener(event -> {
+ trigger.setProcessor(event -> {
if (fired.compareAndSet(false, true)) {
eventRef.set(event);
long currentTimeNanos = timeSource.getTime();
@@ -115,13 +115,13 @@ public class NodeLostTriggerTest extends SolrCloudTestCase {
try (NodeLostTrigger trigger = new NodeLostTrigger("node_lost_trigger", props, container)) {
final long waitTime = 2;
props.put("waitFor", waitTime);
- trigger.setListener(noFirstRunListener);
+ trigger.setProcessor(noFirstRunProcessor);
trigger.run();
JettySolrRunner lostNode = cluster.getJettySolrRunner(1);
lostNode.stop();
AtomicBoolean fired = new AtomicBoolean(false);
- trigger.setListener(event -> {
+ trigger.setProcessor(event -> {
if (fired.compareAndSet(false, true)) {
long currentTimeNanos = timeSource.getTime();
long eventTimeNanos = event.getEventTime();
@@ -210,7 +210,7 @@ public class NodeLostTriggerTest extends SolrCloudTestCase {
CoreContainer container = cluster.getJettySolrRunners().get(0).getCoreContainer();
Map<String, Object> props = createTriggerProps(0);
try (NodeLostTrigger trigger = new NodeLostTrigger("node_added_trigger", props, container)) {
- trigger.setListener(noFirstRunListener);
+ trigger.setProcessor(noFirstRunProcessor);
JettySolrRunner newNode = cluster.startJettySolrRunner();
cluster.waitForAllNodes(5);
@@ -230,7 +230,7 @@ public class NodeLostTriggerTest extends SolrCloudTestCase {
AtomicInteger callCount = new AtomicInteger(0);
AtomicBoolean fired = new AtomicBoolean(false);
- trigger.setListener(event -> {
+ trigger.setProcessor(event -> {
if (callCount.incrementAndGet() < 2) {
return false;
} else {
@@ -263,7 +263,7 @@ public class NodeLostTriggerTest extends SolrCloudTestCase {
// and assert that the new trigger still fires
NodeLostTrigger trigger = new NodeLostTrigger("node_lost_trigger", props, container);
- trigger.setListener(noFirstRunListener);
+ trigger.setProcessor(noFirstRunProcessor);
trigger.run();
// stop the newly created node
@@ -291,7 +291,7 @@ public class NodeLostTriggerTest extends SolrCloudTestCase {
try (NodeLostTrigger newTrigger = new NodeLostTrigger("node_lost_trigger", props, container)) {
AtomicBoolean fired = new AtomicBoolean(false);
AtomicReference<TriggerEvent> eventRef = new AtomicReference<>();
- newTrigger.setListener(event -> {
+ newTrigger.setProcessor(event -> {
if (fired.compareAndSet(false, true)) {
eventRef.set(event);
long currentTimeNanos = timeSource.getTime();