You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2017/07/06 17:58:02 UTC
lucene-solr:jira/solr-10996: SOLR-10996: Fix issues from review.
Implement HttpTriggerListener.
Repository: lucene-solr
Updated Branches:
refs/heads/jira/solr-10996 b52e11480 -> 1f02ae325
SOLR-10996: Fix issues from review. Implement HttpTriggerListener.
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/1f02ae32
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/1f02ae32
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/1f02ae32
Branch: refs/heads/jira/solr-10996
Commit: 1f02ae325e9e2e97b02a1bd544152e768242ae93
Parents: b52e114
Author: Andrzej Bialecki <ab...@apache.org>
Authored: Thu Jul 6 19:57:33 2017 +0200
Committer: Andrzej Bialecki <ab...@apache.org>
Committed: Thu Jul 6 19:57:33 2017 +0200
----------------------------------------------------------------------
.../cloud/autoscaling/AutoScalingConfig.java | 4 +-
.../cloud/autoscaling/HttpTriggerListener.java | 111 ++++++++++++++++++-
.../cloud/autoscaling/LogTriggerListener.java | 5 +-
.../cloud/autoscaling/ScheduledTriggers.java | 67 ++++++-----
.../solr/cloud/autoscaling/TriggerListener.java | 7 +-
.../autoscaling/TriggerIntegrationTest.java | 3 +-
6 files changed, 161 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1f02ae32/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingConfig.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingConfig.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingConfig.java
index 056091d..e3176ad 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingConfig.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingConfig.java
@@ -18,6 +18,7 @@ package org.apache.solr.cloud.autoscaling;
import java.lang.invoke.MethodHandles;
import java.util.Collections;
+import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -50,7 +51,7 @@ public class AutoScalingConfig {
public static class TriggerListenerConfig {
public final String name;
public final String trigger;
- public final Set<AutoScaling.EventProcessorStage> stages;
+ public final EnumSet<AutoScaling.EventProcessorStage> stages = EnumSet.noneOf(AutoScaling.EventProcessorStage.class);
public final String listenerClass;
public final Set<String> beforeActions;
public final Set<String> afterActions;
@@ -61,7 +62,6 @@ public class AutoScalingConfig {
this.properties.putAll(properties);
trigger = (String)properties.get(AutoScalingParams.TRIGGER);
List<String> stageNames = getList(AutoScalingParams.STAGE, properties);
- stages = new HashSet<>(stageNames.size());
for (String stageName : stageNames) {
try {
AutoScaling.EventProcessorStage stage = AutoScaling.EventProcessorStage.valueOf(stageName.toUpperCase(Locale.ROOT));
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1f02ae32/solr/core/src/java/org/apache/solr/cloud/autoscaling/HttpTriggerListener.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/HttpTriggerListener.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/HttpTriggerListener.java
index 4253bf6..d53e444 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/HttpTriggerListener.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/HttpTriggerListener.java
@@ -16,24 +16,129 @@
*/
package org.apache.solr.cloud.autoscaling;
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
+import org.apache.solr.common.util.Utils;
import org.apache.solr.core.CoreContainer;
+import org.apache.solr.util.PropertiesUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- * Simple HTTP callback that sends trigger events as JSON.
+ * Simple HTTP callback that POSTs event data to a URL.
+ * <p>URL and payload may contain property substitution patterns, with the following properties available:
+ * <ul>
+ * <li>config.* - listener configuration</li>
+ * <li>event.* - event properties</li>
+ * <li>stage - current stage of event processing</li>
+ * <li>actionName - optional current action name</li>
+ * <li>context.* - optional {@link ActionContext} properties</li>
+ * <li>error - optional error string (from {@link Throwable#toString()})</li>
+ * <li>message - optional message</li>
+ * </ul>
+ * </p>
+ * The following listener configuration is supported:
+ * <ul>
+ * <li>url - a URL template</li>
+ * <li>payload - optional payload template. If absent a JSON string of all properties listed above will be used.</li>
+ * <li>contentType - optional payload content type. If absent then <code>application/json</code> will be used.</li>
+ * <li>header.* - optional header template(s). The name of the property without "header." prefix defines the literal header name.</li>
+ * </ul>
*/
public class HttpTriggerListener extends TriggerListenerBase {
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private HttpClient httpClient;
+ private String urlTemplate;
+ private String payloadTemplate;
+ private String contentType;
+ private Map<String, String> headerTemplates = new HashMap<>();
@Override
public void init(CoreContainer coreContainer, AutoScalingConfig.TriggerListenerConfig config) {
super.init(coreContainer, config);
httpClient = coreContainer.getUpdateShardHandler().getHttpClient();
+ urlTemplate = (String)config.properties.get("url");
+ payloadTemplate = (String)config.properties.get("payload");
+ contentType = (String)config.properties.get("contentType");
+ config.properties.forEach((k, v) -> {
+ if (k.startsWith("header.")) {
+ headerTemplates.put(k.substring(7), String.valueOf(v));
+ }
+ });
}
@Override
- public void onEvent(AutoScaling.EventProcessorStage stage, String actionName, TriggerEvent event, String message) {
-
+ public void onEvent(TriggerEvent event, AutoScaling.EventProcessorStage stage, String actionName, ActionContext context, Throwable error, String message) {
+ Properties properties = new Properties();
+ properties.setProperty("stage", stage.toString());
+ if (actionName != null) {
+ properties.setProperty("actionName", actionName);
+ }
+ if (context != null) {
+ context.getProperties().forEach((k, v) -> {
+ properties.setProperty("context." + k, String.valueOf(v));
+ });
+ }
+ if (error != null) {
+ properties.setProperty("error", error.toString());
+ }
+ if (message != null) {
+ properties.setProperty("message", message);
+ }
+ // add event properties
+ properties.setProperty("event.id", event.getId());
+ properties.setProperty("event.source", event.getSource());
+ properties.setProperty("event.eventTime", String.valueOf(event.eventTime));
+ properties.setProperty("event.eventType", event.getEventType().toString());
+ event.getProperties().forEach((k, v) -> {
+ properties.setProperty("event.properties." + k, String.valueOf(v));
+ });
+ // add config properties
+ config.properties.forEach((k, v) -> {
+ properties.setProperty("config." + k, String.valueOf(v));
+ });
+ String url = PropertiesUtil.substituteProperty(urlTemplate, properties);
+ String payload;
+ String type;
+ if (payloadTemplate != null) {
+ payload = PropertiesUtil.substituteProperty(payloadTemplate, properties);
+ if (contentType != null) {
+ type = contentType;
+ } else {
+ type = "application/json";
+ }
+ } else {
+ payload = Utils.toJSONString(properties);
+ type = "application/json";
+ }
+ HttpPost post = new HttpPost(url);
+ HttpEntity entity = new StringEntity(payload, "UTF-8");
+ headerTemplates.forEach((k, v) -> {
+ String headerVal = PropertiesUtil.substituteProperty(v, properties);
+ if (!headerVal.isEmpty()) {
+ post.addHeader(k, headerVal);
+ }
+ });
+ post.setEntity(entity);
+ post.setHeader("Content-Type", type);
+ try {
+ HttpResponse rsp = httpClient.execute(post);
+ int statusCode = rsp.getStatusLine().getStatusCode();
+ if (statusCode != 200) {
+ LOG.warn("Error sending request for event " + event + ", HTTP response: " + rsp.toString());
+ }
+ } catch (IOException e) {
+ LOG.warn("Exception sending request for event " + event, e);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1f02ae32/solr/core/src/java/org/apache/solr/cloud/autoscaling/LogTriggerListener.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/LogTriggerListener.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/LogTriggerListener.java
index bf00114..108f41b 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/LogTriggerListener.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/LogTriggerListener.java
@@ -32,7 +32,8 @@ public class LogTriggerListener extends TriggerListenerBase {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@Override
- public void onEvent(AutoScaling.EventProcessorStage stage, String actionName, TriggerEvent event, String message) {
- LOG.info("stage={}, actionName={}, event={}, messsage={}", stage, actionName, event, message);
+ public void onEvent(TriggerEvent event, AutoScaling.EventProcessorStage stage, String actionName, ActionContext context,
+ Throwable error, String message) {
+ LOG.info("{}: stage={}, actionName={}, event={}, error={}, messsage={}", config.name, stage, actionName, event, error, message);
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1f02ae32/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
index d5d29de..d2f03f5 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
@@ -25,6 +25,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -144,24 +145,24 @@ public class ScheduledTriggers implements Closeable {
scheduledTriggers.replace(newTrigger.getName(), scheduledTrigger);
}
newTrigger.setProcessor(event -> {
- listeners.fireListeners(event.getSource(), AutoScaling.EventProcessorStage.STARTED, null, event, null);
ScheduledTrigger scheduledSource = scheduledTriggers.get(event.getSource());
if (scheduledSource == null) {
- String msg = String.format("Ignoring autoscaling event {} because the source trigger: {} doesn't exist.", event.toString(), event.getSource());
- listeners.fireListeners(event.getSource(), AutoScaling.EventProcessorStage.FAILED, null, event, msg);
+ String msg = String.format(Locale.ROOT, "Ignoring autoscaling event %s because the source trigger: %s doesn't exist.", event.toString(), event.getSource());
+ listeners.fireListeners(event.getSource(), event, AutoScaling.EventProcessorStage.FAILED, msg);
log.warn(msg);
return false;
}
boolean replaying = event.getProperty(TriggerEvent.REPLAYING) != null ? (Boolean)event.getProperty(TriggerEvent.REPLAYING) : false;
AutoScaling.Trigger source = scheduledSource.trigger;
if (source.isClosed()) {
- String msg = String.format("Ignoring autoscaling event {} because the source trigger: {} has already been closed", event.toString(), source);
- listeners.fireListeners(event.getSource(), AutoScaling.EventProcessorStage.ABORTED, null, event, msg);
+ String msg = String.format(Locale.ROOT, "Ignoring autoscaling event %s because the source trigger: %s has already been closed", event.toString(), source);
+ listeners.fireListeners(event.getSource(), event, AutoScaling.EventProcessorStage.ABORTED, msg);
log.warn(msg);
// we do not want to lose this event just because the trigger was closed, perhaps a replacement will need it
return false;
}
if (hasPendingActions.compareAndSet(false, true)) {
+ listeners.fireListeners(event.getSource(), event, AutoScaling.EventProcessorStage.STARTED);
final boolean enqueued;
if (replaying) {
enqueued = false;
@@ -179,21 +180,21 @@ public class ScheduledTriggers implements Closeable {
actionThrottle.markAttemptingAction();
ActionContext actionContext = new ActionContext(coreContainer, newTrigger, new HashMap<>());
for (TriggerAction action : actions) {
- listeners.fireListeners(event.getSource(), AutoScaling.EventProcessorStage.BEFORE_ACTION, action.getName(), event, null);
+ listeners.fireListeners(event.getSource(), event, AutoScaling.EventProcessorStage.BEFORE_ACTION, action.getName(), actionContext);
try {
action.process(event, actionContext);
} catch (Exception e) {
- listeners.fireListeners(event.getSource(), AutoScaling.EventProcessorStage.FAILED, action.getName(), event, null);
+ listeners.fireListeners(event.getSource(), event, AutoScaling.EventProcessorStage.FAILED, action.getName(), actionContext, e, null);
log.error("Error executing action: " + action.getName() + " for trigger event: " + event, e);
throw e;
}
- listeners.fireListeners(event.getSource(), AutoScaling.EventProcessorStage.AFTER_ACTION, action.getName(), event, null);
+ listeners.fireListeners(event.getSource(), event, AutoScaling.EventProcessorStage.AFTER_ACTION, action.getName(), actionContext);
}
if (enqueued) {
TriggerEvent ev = scheduledTrigger.dequeue();
assert ev.getId().equals(event.getId());
}
- listeners.fireListeners(event.getSource(), AutoScaling.EventProcessorStage.SUCCEEDED, null, event, null);
+ listeners.fireListeners(event.getSource(), event, AutoScaling.EventProcessorStage.SUCCEEDED);
} finally {
hasPendingActions.set(false);
}
@@ -206,12 +207,11 @@ public class ScheduledTriggers implements Closeable {
+ " is broken! Expected event=" + event + " but got " + ev);
}
}
- listeners.fireListeners(event.getSource(), AutoScaling.EventProcessorStage.SUCCEEDED, null, event, null);
+ listeners.fireListeners(event.getSource(), event, AutoScaling.EventProcessorStage.SUCCEEDED);
hasPendingActions.set(false);
}
return true;
} else {
- listeners.fireListeners(event.getSource(), AutoScaling.EventProcessorStage.WAITING, null, event, null);
// there is an action in the queue and we don't want to enqueue another until it is complete
return false;
}
@@ -375,9 +375,8 @@ public class ScheduledTriggers implements Closeable {
void setAutoScalingConfig(AutoScalingConfig autoScalingConfig) {
updateLock.lock();
+ reset();
try {
- // close previous instances
- close();
// instantiate only those for existing triggers
Set<String> triggerNames = autoScalingConfig.getTriggerConfigs().keySet();
Map<String, AutoScalingConfig.TriggerListenerConfig> configs = autoScalingConfig.getTriggerListenerConfigs();
@@ -417,31 +416,29 @@ public class ScheduledTriggers implements Closeable {
}
private void addPerStage(String triggerName, AutoScaling.EventProcessorStage stage, TriggerListener listener) {
- Map<AutoScaling.EventProcessorStage, List<TriggerListener>> perStage = listenerPerStage.get(triggerName);
- if (perStage == null) {
- perStage = new HashMap<>();
- listenerPerStage.put(triggerName, perStage);
- }
- List<TriggerListener> lst = perStage.get(stage);
- if (lst == null) {
- lst = new ArrayList<>(3);
- perStage.put(stage, lst);
- }
+ Map<AutoScaling.EventProcessorStage, List<TriggerListener>> perStage =
+ listenerPerStage.computeIfAbsent(triggerName, k -> new HashMap<>());
+ List<TriggerListener> lst = perStage.computeIfAbsent(stage, k -> new ArrayList<>(3));
lst.add(listener);
}
- void close() {
+ void reset() {
updateLock.lock();
try {
listenerPerStage.clear();
for (TriggerListener listener : listeners) {
IOUtils.closeQuietly(listener);
}
+ listeners.clear();
} finally {
updateLock.unlock();
}
}
+ void close() {
+ reset();
+ }
+
List<TriggerListener> getTriggerListeners(String trigger, AutoScaling.EventProcessorStage stage) {
Map<AutoScaling.EventProcessorStage, List<TriggerListener>> perStage = listenerPerStage.get(trigger);
if (perStage == null) {
@@ -455,7 +452,21 @@ public class ScheduledTriggers implements Closeable {
}
}
- void fireListeners(String trigger, AutoScaling.EventProcessorStage stage, String actionName, TriggerEvent event, String message) {
+ void fireListeners(String trigger, TriggerEvent event, AutoScaling.EventProcessorStage stage) {
+ fireListeners(trigger, event, stage, null, null, null, null);
+ }
+
+ void fireListeners(String trigger, TriggerEvent event, AutoScaling.EventProcessorStage stage, String message) {
+ fireListeners(trigger, event, stage, null, null, null, message);
+ }
+
+ void fireListeners(String trigger, TriggerEvent event, AutoScaling.EventProcessorStage stage, String actionName,
+ ActionContext context) {
+ fireListeners(trigger, event, stage, actionName, context, null, null);
+ }
+
+ void fireListeners(String trigger, TriggerEvent event, AutoScaling.EventProcessorStage stage, String actionName,
+ ActionContext context, Throwable error, String message) {
updateLock.lock();
try {
for (TriggerListener listener : getTriggerListeners(trigger, stage)) {
@@ -471,7 +482,11 @@ public class ScheduledTriggers implements Closeable {
}
}
}
- listener.onEvent(stage, actionName, event, message);
+ try {
+ listener.onEvent(event, stage, actionName, context, error, message);
+ } catch (Exception e) {
+ log.warn("Exception running listener " + listener.getTriggerListenerConfig(), e);
+ }
}
} finally {
updateLock.unlock();
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1f02ae32/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerListener.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerListener.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerListener.java
index 0cd5735..e67366c 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerListener.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerListener.java
@@ -33,10 +33,13 @@ public interface TriggerListener extends Closeable {
/**
* This method is called when either a particular <code>stage</code> or
* <code>actionName</code> is reached during event processing.
+ * @param event current event being processed
* @param stage {@link AutoScaling.EventProcessorStage} that this listener was registered for, or null
* @param actionName {@link TriggerAction} name that this listener was registered for, or null
- * @param event current event being processed
+ * @param context optional {@link ActionContext} when the processing stage is related to an action, or null
+ * @param error optional {@link Throwable} error, or null
* @param message optional message
*/
- void onEvent(AutoScaling.EventProcessorStage stage, String actionName, TriggerEvent event, String message);
+ void onEvent(TriggerEvent event, AutoScaling.EventProcessorStage stage, String actionName, ActionContext context,
+ Throwable error, String message);
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1f02ae32/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
index 4f44afb..07a8e60 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
@@ -968,7 +968,8 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
}
@Override
- public synchronized void onEvent(AutoScaling.EventProcessorStage stage, String actionName, TriggerEvent event, String message) {
+ public synchronized void onEvent(TriggerEvent event, AutoScaling.EventProcessorStage stage, String actionName,
+ ActionContext context, Throwable error, String message) {
List<TestEvent> lst = listenerEvents.get(config.name);
if (lst == null) {
lst = new ArrayList<>();