You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2017/07/06 17:58:02 UTC

lucene-solr:jira/solr-10996: SOLR-10996: Fix issues from review. Implement HttpTriggerListener.

Repository: lucene-solr
Updated Branches:
  refs/heads/jira/solr-10996 b52e11480 -> 1f02ae325


SOLR-10996: Fix issues from review. Implement HttpTriggerListener.


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/1f02ae32
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/1f02ae32
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/1f02ae32

Branch: refs/heads/jira/solr-10996
Commit: 1f02ae325e9e2e97b02a1bd544152e768242ae93
Parents: b52e114
Author: Andrzej Bialecki <ab...@apache.org>
Authored: Thu Jul 6 19:57:33 2017 +0200
Committer: Andrzej Bialecki <ab...@apache.org>
Committed: Thu Jul 6 19:57:33 2017 +0200

----------------------------------------------------------------------
 .../cloud/autoscaling/AutoScalingConfig.java    |   4 +-
 .../cloud/autoscaling/HttpTriggerListener.java  | 111 ++++++++++++++++++-
 .../cloud/autoscaling/LogTriggerListener.java   |   5 +-
 .../cloud/autoscaling/ScheduledTriggers.java    |  67 ++++++-----
 .../solr/cloud/autoscaling/TriggerListener.java |   7 +-
 .../autoscaling/TriggerIntegrationTest.java     |   3 +-
 6 files changed, 161 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1f02ae32/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingConfig.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingConfig.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingConfig.java
index 056091d..e3176ad 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingConfig.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingConfig.java
@@ -18,6 +18,7 @@ package org.apache.solr.cloud.autoscaling;
 
 import java.lang.invoke.MethodHandles;
 import java.util.Collections;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -50,7 +51,7 @@ public class AutoScalingConfig {
   public static class TriggerListenerConfig {
     public final String name;
     public final String trigger;
-    public final Set<AutoScaling.EventProcessorStage> stages;
+    public final EnumSet<AutoScaling.EventProcessorStage> stages = EnumSet.noneOf(AutoScaling.EventProcessorStage.class);
     public final String listenerClass;
     public final Set<String> beforeActions;
     public final Set<String> afterActions;
@@ -61,7 +62,6 @@ public class AutoScalingConfig {
       this.properties.putAll(properties);
       trigger = (String)properties.get(AutoScalingParams.TRIGGER);
       List<String> stageNames = getList(AutoScalingParams.STAGE, properties);
-      stages = new HashSet<>(stageNames.size());
       for (String stageName : stageNames) {
         try {
           AutoScaling.EventProcessorStage stage = AutoScaling.EventProcessorStage.valueOf(stageName.toUpperCase(Locale.ROOT));

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1f02ae32/solr/core/src/java/org/apache/solr/cloud/autoscaling/HttpTriggerListener.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/HttpTriggerListener.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/HttpTriggerListener.java
index 4253bf6..d53e444 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/HttpTriggerListener.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/HttpTriggerListener.java
@@ -16,24 +16,129 @@
  */
 package org.apache.solr.cloud.autoscaling;
 
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
 import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
+import org.apache.solr.common.util.Utils;
 import org.apache.solr.core.CoreContainer;
+import org.apache.solr.util.PropertiesUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
- * Simple HTTP callback that sends trigger events as JSON.
+ * Simple HTTP callback that POSTs event data to a URL.
+ * <p>URL and payload may contain property substitution patterns, with the following properties available:
+ * <ul>
+ *   <li>config.* - listener configuration</li>
+ *   <li>event.* - event properties</li>
+ *   <li>stage - current stage of event processing</li>
+ *   <li>actionName - optional current action name</li>
+ *   <li>context.* - optional {@link ActionContext} properties</li>
+ *   <li>error - optional error string (from {@link Throwable#toString()})</li>
+ *   <li>message - optional message</li>
+ * </ul>
+ * </p>
+ * The following listener configuration is supported:
+ * <ul>
+ *   <li>url - a URL template</li>
+ *   <li>payload - optional payload template. If absent a JSON string of all properties listed above will be used.</li>
+ *   <li>contentType - optional payload content type. If absent then <code>application/json</code> will be used.</li>
+ *   <li>header.* - optional header template(s). The name of the property without "header." prefix defines the literal header name.</li>
+ * </ul>
  */
 public class HttpTriggerListener extends TriggerListenerBase {
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   private HttpClient httpClient;
+  private String urlTemplate;
+  private String payloadTemplate;
+  private String contentType;
+  private Map<String, String> headerTemplates = new HashMap<>();
 
   @Override
   public void init(CoreContainer coreContainer, AutoScalingConfig.TriggerListenerConfig config) {
     super.init(coreContainer, config);
     httpClient = coreContainer.getUpdateShardHandler().getHttpClient();
+    urlTemplate = (String)config.properties.get("url");
+    payloadTemplate = (String)config.properties.get("payload");
+    contentType = (String)config.properties.get("contentType");
+    config.properties.forEach((k, v) -> {
+      if (k.startsWith("header.")) {
+        headerTemplates.put(k.substring(7), String.valueOf(v));
+      }
+    });
   }
 
   @Override
-  public void onEvent(AutoScaling.EventProcessorStage stage, String actionName, TriggerEvent event, String message) {
-
+  public void onEvent(TriggerEvent event, AutoScaling.EventProcessorStage stage, String actionName, ActionContext context, Throwable error, String message) {
+    Properties properties = new Properties();
+    properties.setProperty("stage", stage.toString());
+    if (actionName != null) {
+      properties.setProperty("actionName", actionName);
+    }
+    if (context != null) {
+      context.getProperties().forEach((k, v) -> {
+        properties.setProperty("context." + k, String.valueOf(v));
+      });
+    }
+    if (error != null) {
+      properties.setProperty("error", error.toString());
+    }
+    if (message != null) {
+      properties.setProperty("message", message);
+    }
+    // add event properties
+    properties.setProperty("event.id", event.getId());
+    properties.setProperty("event.source", event.getSource());
+    properties.setProperty("event.eventTime", String.valueOf(event.eventTime));
+    properties.setProperty("event.eventType", event.getEventType().toString());
+    event.getProperties().forEach((k, v) -> {
+      properties.setProperty("event.properties." + k, String.valueOf(v));
+    });
+    // add config properties
+    config.properties.forEach((k, v) -> {
+      properties.setProperty("config." + k, String.valueOf(v));
+    });
+    String url = PropertiesUtil.substituteProperty(urlTemplate, properties);
+    String payload;
+    String type;
+    if (payloadTemplate != null) {
+      payload = PropertiesUtil.substituteProperty(payloadTemplate, properties);
+      if (contentType != null) {
+        type = contentType;
+      } else {
+        type = "application/json";
+      }
+    } else {
+      payload = Utils.toJSONString(properties);
+      type = "application/json";
+    }
+    HttpPost post = new HttpPost(url);
+    HttpEntity entity = new StringEntity(payload, "UTF-8");
+    headerTemplates.forEach((k, v) -> {
+      String headerVal = PropertiesUtil.substituteProperty(v, properties);
+      if (!headerVal.isEmpty()) {
+        post.addHeader(k, headerVal);
+      }
+    });
+    post.setEntity(entity);
+    post.setHeader("Content-Type", type);
+    try {
+      HttpResponse rsp = httpClient.execute(post);
+      int statusCode = rsp.getStatusLine().getStatusCode();
+      if (statusCode != 200) {
+        LOG.warn("Error sending request for event " + event + ", HTTP response: " + rsp.toString());
+      }
+    } catch (IOException e) {
+      LOG.warn("Exception sending request for event " + event, e);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1f02ae32/solr/core/src/java/org/apache/solr/cloud/autoscaling/LogTriggerListener.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/LogTriggerListener.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/LogTriggerListener.java
index bf00114..108f41b 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/LogTriggerListener.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/LogTriggerListener.java
@@ -32,7 +32,8 @@ public class LogTriggerListener extends TriggerListenerBase {
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   @Override
-  public void onEvent(AutoScaling.EventProcessorStage stage, String actionName, TriggerEvent event, String message) {
-    LOG.info("stage={}, actionName={}, event={}, messsage={}", stage, actionName, event, message);
+  public void onEvent(TriggerEvent event, AutoScaling.EventProcessorStage stage, String actionName, ActionContext context,
+               Throwable error, String message) {
+    LOG.info("{}: stage={}, actionName={}, event={}, error={}, messsage={}", config.name, stage, actionName, event, error, message);
   }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1f02ae32/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
index d5d29de..d2f03f5 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
@@ -25,6 +25,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -144,24 +145,24 @@ public class ScheduledTriggers implements Closeable {
       scheduledTriggers.replace(newTrigger.getName(), scheduledTrigger);
     }
     newTrigger.setProcessor(event -> {
-      listeners.fireListeners(event.getSource(), AutoScaling.EventProcessorStage.STARTED, null, event, null);
       ScheduledTrigger scheduledSource = scheduledTriggers.get(event.getSource());
       if (scheduledSource == null) {
-        String msg = String.format("Ignoring autoscaling event {} because the source trigger: {} doesn't exist.", event.toString(), event.getSource());
-        listeners.fireListeners(event.getSource(), AutoScaling.EventProcessorStage.FAILED, null, event, msg);
+        String msg = String.format(Locale.ROOT, "Ignoring autoscaling event %s because the source trigger: %s doesn't exist.", event.toString(), event.getSource());
+        listeners.fireListeners(event.getSource(), event, AutoScaling.EventProcessorStage.FAILED, msg);
         log.warn(msg);
         return false;
       }
       boolean replaying = event.getProperty(TriggerEvent.REPLAYING) != null ? (Boolean)event.getProperty(TriggerEvent.REPLAYING) : false;
       AutoScaling.Trigger source = scheduledSource.trigger;
       if (source.isClosed()) {
-        String msg = String.format("Ignoring autoscaling event {} because the source trigger: {} has already been closed", event.toString(), source);
-        listeners.fireListeners(event.getSource(), AutoScaling.EventProcessorStage.ABORTED, null, event, msg);
+        String msg = String.format(Locale.ROOT, "Ignoring autoscaling event %s because the source trigger: %s has already been closed", event.toString(), source);
+        listeners.fireListeners(event.getSource(), event, AutoScaling.EventProcessorStage.ABORTED, msg);
         log.warn(msg);
         // we do not want to lose this event just because the trigger was closed, perhaps a replacement will need it
         return false;
       }
       if (hasPendingActions.compareAndSet(false, true)) {
+        listeners.fireListeners(event.getSource(), event, AutoScaling.EventProcessorStage.STARTED);
         final boolean enqueued;
         if (replaying) {
           enqueued = false;
@@ -179,21 +180,21 @@ public class ScheduledTriggers implements Closeable {
               actionThrottle.markAttemptingAction();
               ActionContext actionContext = new ActionContext(coreContainer, newTrigger, new HashMap<>());
               for (TriggerAction action : actions) {
-                listeners.fireListeners(event.getSource(), AutoScaling.EventProcessorStage.BEFORE_ACTION, action.getName(), event, null);
+                listeners.fireListeners(event.getSource(), event, AutoScaling.EventProcessorStage.BEFORE_ACTION, action.getName(), actionContext);
                 try {
                   action.process(event, actionContext);
                 } catch (Exception e) {
-                  listeners.fireListeners(event.getSource(), AutoScaling.EventProcessorStage.FAILED, action.getName(), event, null);
+                  listeners.fireListeners(event.getSource(), event, AutoScaling.EventProcessorStage.FAILED, action.getName(), actionContext, e, null);
                   log.error("Error executing action: " + action.getName() + " for trigger event: " + event, e);
                   throw e;
                 }
-                listeners.fireListeners(event.getSource(), AutoScaling.EventProcessorStage.AFTER_ACTION, action.getName(), event, null);
+                listeners.fireListeners(event.getSource(), event, AutoScaling.EventProcessorStage.AFTER_ACTION, action.getName(), actionContext);
               }
               if (enqueued) {
                 TriggerEvent ev = scheduledTrigger.dequeue();
                 assert ev.getId().equals(event.getId());
               }
-              listeners.fireListeners(event.getSource(), AutoScaling.EventProcessorStage.SUCCEEDED, null, event, null);
+              listeners.fireListeners(event.getSource(), event, AutoScaling.EventProcessorStage.SUCCEEDED);
             } finally {
               hasPendingActions.set(false);
             }
@@ -206,12 +207,11 @@ public class ScheduledTriggers implements Closeable {
               + " is broken! Expected event=" + event + " but got " + ev);
             }
           }
-          listeners.fireListeners(event.getSource(), AutoScaling.EventProcessorStage.SUCCEEDED, null, event, null);
+          listeners.fireListeners(event.getSource(), event, AutoScaling.EventProcessorStage.SUCCEEDED);
           hasPendingActions.set(false);
         }
         return true;
       } else {
-        listeners.fireListeners(event.getSource(), AutoScaling.EventProcessorStage.WAITING, null, event, null);
         // there is an action in the queue and we don't want to enqueue another until it is complete
         return false;
       }
@@ -375,9 +375,8 @@ public class ScheduledTriggers implements Closeable {
 
     void setAutoScalingConfig(AutoScalingConfig autoScalingConfig) {
       updateLock.lock();
+      reset();
       try {
-        // close previous instances
-        close();
         // instantiate only those for existing triggers
         Set<String> triggerNames = autoScalingConfig.getTriggerConfigs().keySet();
         Map<String, AutoScalingConfig.TriggerListenerConfig> configs = autoScalingConfig.getTriggerListenerConfigs();
@@ -417,31 +416,29 @@ public class ScheduledTriggers implements Closeable {
     }
 
     private void addPerStage(String triggerName, AutoScaling.EventProcessorStage stage, TriggerListener listener) {
-      Map<AutoScaling.EventProcessorStage, List<TriggerListener>> perStage = listenerPerStage.get(triggerName);
-      if (perStage == null) {
-        perStage = new HashMap<>();
-        listenerPerStage.put(triggerName, perStage);
-      }
-      List<TriggerListener> lst = perStage.get(stage);
-      if (lst == null) {
-        lst = new ArrayList<>(3);
-        perStage.put(stage, lst);
-      }
+      Map<AutoScaling.EventProcessorStage, List<TriggerListener>> perStage =
+          listenerPerStage.computeIfAbsent(triggerName, k -> new HashMap<>());
+      List<TriggerListener> lst = perStage.computeIfAbsent(stage, k -> new ArrayList<>(3));
       lst.add(listener);
     }
 
-    void close() {
+    void reset() {
       updateLock.lock();
       try {
         listenerPerStage.clear();
         for (TriggerListener listener : listeners) {
           IOUtils.closeQuietly(listener);
         }
+        listeners.clear();
       } finally {
         updateLock.unlock();
       }
     }
 
+    void close() {
+      reset();
+    }
+
     List<TriggerListener> getTriggerListeners(String trigger, AutoScaling.EventProcessorStage stage) {
       Map<AutoScaling.EventProcessorStage, List<TriggerListener>> perStage = listenerPerStage.get(trigger);
       if (perStage == null) {
@@ -455,7 +452,21 @@ public class ScheduledTriggers implements Closeable {
       }
     }
 
-    void fireListeners(String trigger, AutoScaling.EventProcessorStage stage, String actionName, TriggerEvent event, String message) {
+    void fireListeners(String trigger, TriggerEvent event, AutoScaling.EventProcessorStage stage) {
+      fireListeners(trigger, event, stage, null, null, null, null);
+    }
+
+    void fireListeners(String trigger, TriggerEvent event, AutoScaling.EventProcessorStage stage, String message) {
+      fireListeners(trigger, event, stage, null, null, null, message);
+    }
+
+    void fireListeners(String trigger, TriggerEvent event, AutoScaling.EventProcessorStage stage, String actionName,
+                       ActionContext context) {
+      fireListeners(trigger, event, stage, actionName, context, null, null);
+    }
+
+    void fireListeners(String trigger, TriggerEvent event, AutoScaling.EventProcessorStage stage, String actionName,
+                       ActionContext context, Throwable error, String message) {
       updateLock.lock();
       try {
         for (TriggerListener listener : getTriggerListeners(trigger, stage)) {
@@ -471,7 +482,11 @@ public class ScheduledTriggers implements Closeable {
               }
             }
           }
-          listener.onEvent(stage, actionName, event, message);
+          try {
+            listener.onEvent(event, stage, actionName, context, error, message);
+          } catch (Exception e) {
+            log.warn("Exception running listener " + listener.getTriggerListenerConfig(), e);
+          }
         }
       } finally {
         updateLock.unlock();

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1f02ae32/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerListener.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerListener.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerListener.java
index 0cd5735..e67366c 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerListener.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerListener.java
@@ -33,10 +33,13 @@ public interface TriggerListener extends Closeable {
   /**
    * This method is called when either a particular <code>stage</code> or
    * <code>actionName</code> is reached during event processing.
+   * @param event current event being processed
    * @param stage {@link AutoScaling.EventProcessorStage} that this listener was registered for, or null
    * @param actionName {@link TriggerAction} name that this listener was registered for, or null
-   * @param event current event being processed
+   * @param context optional {@link ActionContext} when the processing stage is related to an action, or null
+   * @param error optional {@link Throwable} error, or null
    * @param message optional message
    */
-  void onEvent(AutoScaling.EventProcessorStage stage, String actionName, TriggerEvent event, String message);
+  void onEvent(TriggerEvent event, AutoScaling.EventProcessorStage stage, String actionName, ActionContext context,
+               Throwable error, String message);
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1f02ae32/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
index 4f44afb..07a8e60 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
@@ -968,7 +968,8 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
     }
 
     @Override
-    public synchronized void onEvent(AutoScaling.EventProcessorStage stage, String actionName, TriggerEvent event, String message) {
+    public synchronized void onEvent(TriggerEvent event, AutoScaling.EventProcessorStage stage, String actionName,
+                                     ActionContext context, Throwable error, String message) {
       List<TestEvent> lst = listenerEvents.get(config.name);
       if (lst == null) {
         lst = new ArrayList<>();