You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2017/05/16 10:27:13 UTC

lucene-solr:jira/solr-10515: SOLR-10515: Initial support for per-trigger ZK queues for events.

Repository: lucene-solr
Updated Branches:
  refs/heads/jira/solr-10515 [created] 247270713


SOLR-10515: Initial support for per-trigger ZK queues for events.


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/24727071
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/24727071
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/24727071

Branch: refs/heads/jira/solr-10515
Commit: 247270713cce25dacfa07799b61774d427de8b67
Parents: 607184c
Author: Andrzej Bialecki <ab...@apache.org>
Authored: Tue May 16 12:26:23 2017 +0200
Committer: Andrzej Bialecki <ab...@apache.org>
Committed: Tue May 16 12:26:23 2017 +0200

----------------------------------------------------------------------
 .../solr/cloud/autoscaling/AutoScaling.java     |  41 ++++----
 .../cloud/autoscaling/NodeAddedTrigger.java     |  44 +-------
 .../solr/cloud/autoscaling/NodeLostTrigger.java |  44 +-------
 .../autoscaling/OverseerTriggerThread.java      |   2 +-
 .../cloud/autoscaling/ScheduledTriggers.java    | 105 +++++++++++++------
 .../cloud/autoscaling/TriggerEventBase.java     |  84 +++++++++++++++
 .../cloud/autoscaling/TriggerEventQueue.java    |  80 ++++++++++++++
 .../cloud/autoscaling/NodeAddedTriggerTest.java |   4 +-
 .../cloud/autoscaling/NodeLostTriggerTest.java  |   4 +-
 .../autoscaling/TriggerIntegrationTest.java     |  14 +--
 .../apache/solr/common/cloud/ZkStateReader.java |   1 +
 11 files changed, 282 insertions(+), 141 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/24727071/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java
index cd08ea9..0b6aca3 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java
@@ -19,6 +19,7 @@ package org.apache.solr.cloud.autoscaling;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -47,24 +48,28 @@ public class AutoScaling {
     AFTER_ACTION
   }
 
-  public static interface TriggerEvent<T extends Trigger> {
-    public T getSource();
+  public interface TriggerEvent {
+    EventType getEventType();
 
-    public long getEventNanoTime();
+    String getSource();
 
-    public void setContext(Map<String, Object> context);
+    long getEventNanoTime();
 
-    public Map<String, Object> getContext();
+    void setProperties(Map<String, Object> properties);
+
+    Map<String, Object> getProperties();
+
+    Object getProperty(String name);
   }
 
-  public static interface TriggerListener<E extends TriggerEvent<? extends Trigger>> {
+  public interface TriggerListener<E extends TriggerEvent> {
     /**
      * This method is executed when a trigger is ready to fire.
      *
      * @param event a subclass of {@link TriggerEvent}
      * @return true if the listener was ready to perform actions on the event, false otherwise.
      */
-    public boolean triggerFired(E event);
+    boolean triggerFired(E event);
   }
 
   public static class HttpCallbackListener implements TriggerListener {
@@ -94,26 +99,26 @@ public class AutoScaling {
    *
    * @param <E> the {@link TriggerEvent} which is handled by this Trigger
    */
-  public static interface Trigger<E extends TriggerEvent<? extends Trigger>> extends Closeable, Runnable {
-    public String getName();
+  public interface Trigger<E extends TriggerEvent> extends Closeable, Runnable {
+    String getName();
 
-    public EventType getEventType();
+    EventType getEventType();
 
-    public boolean isEnabled();
+    boolean isEnabled();
 
-    public Map<String, Object> getProperties();
+    Map<String, Object> getProperties();
 
-    public int getWaitForSecond();
+    int getWaitForSecond();
 
-    public List<TriggerAction> getActions();
+    List<TriggerAction> getActions();
 
-    public void setListener(TriggerListener<E> listener);
+    void setListener(TriggerListener<E> listener);
 
-    public TriggerListener<E> getListener();
+    TriggerListener<E> getListener();
 
-    public boolean isClosed();
+    boolean isClosed();
 
-    public void restoreState(Trigger<E> old);
+    void restoreState(Trigger<E> old);
   }
 
   public static class TriggerFactory implements Closeable {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/24727071/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java
index 5a340be..86cc13e 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java
@@ -202,7 +202,7 @@ public class NodeAddedTrigger implements AutoScaling.Trigger<NodeAddedTrigger.No
           AutoScaling.TriggerListener<NodeAddedEvent> listener = listenerRef.get();
           if (listener != null) {
             log.info("NodeAddedTrigger {} firing registered listener for node: {} added at {} nanotime, now: {} nanotime", name, nodeName, timeAdded, now);
-            if (listener.triggerFired(new NodeAddedEvent(this, timeAdded, nodeName))) {
+            if (listener.triggerFired(new NodeAddedEvent(getEventType(), getName(), timeAdded, nodeName))) {
               // remove from tracking set only if the fire was accepted
               trackingKeySet.remove(nodeName);
             }
@@ -225,45 +225,11 @@ public class NodeAddedTrigger implements AutoScaling.Trigger<NodeAddedTrigger.No
     }
   }
 
-  public static class NodeAddedEvent implements AutoScaling.TriggerEvent<NodeAddedTrigger> {
-    private final NodeAddedTrigger source;
-    private final long nodeAddedNanoTime;
-    private final String nodeName;
+  public static class NodeAddedEvent extends TriggerEventBase {
+    public static final String NODE_NAME = "nodeName";
 
-    private Map<String, Object> context;
-
-    public NodeAddedEvent(NodeAddedTrigger source, long nodeAddedNanoTime, String nodeAdded) {
-      this.source = source;
-      this.nodeAddedNanoTime = nodeAddedNanoTime;
-      this.nodeName = nodeAdded;
-    }
-
-    @Override
-    public NodeAddedTrigger getSource() {
-      return source;
-    }
-
-    @Override
-    public long getEventNanoTime() {
-      return nodeAddedNanoTime;
-    }
-
-    public String getNodeName() {
-      return nodeName;
-    }
-
-    public AutoScaling.EventType getType() {
-      return source.getEventType();
-    }
-
-    @Override
-    public void setContext(Map<String, Object> context) {
-      this.context = context;
-    }
-
-    @Override
-    public Map<String, Object> getContext() {
-      return context;
+    public NodeAddedEvent(AutoScaling.EventType eventType, String source, long nodeAddedNanoTime, String nodeAdded) {
+      super(eventType, source, nodeAddedNanoTime, Collections.singletonMap(NODE_NAME, nodeAdded));
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/24727071/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
index 385eca5..35c94ac 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
@@ -197,7 +197,7 @@ public class NodeLostTrigger implements AutoScaling.Trigger<NodeLostTrigger.Node
           AutoScaling.TriggerListener<NodeLostEvent> listener = listenerRef.get();
           if (listener != null) {
             log.info("NodeLostTrigger firing registered listener");
-            if (listener.triggerFired(new NodeLostEvent(this, timeRemoved, nodeName)))  {
+            if (listener.triggerFired(new NodeLostEvent(getEventType(), getName(), timeRemoved, nodeName)))  {
               trackingKeySet.remove(nodeName);
             }
           } else  {
@@ -219,45 +219,11 @@ public class NodeLostTrigger implements AutoScaling.Trigger<NodeLostTrigger.Node
     }
   }
 
-  public static class NodeLostEvent implements AutoScaling.TriggerEvent<NodeLostTrigger> {
-    private final NodeLostTrigger source;
-    private final long nodeLostNanoTime;
-    private final String nodeName;
+  public static class NodeLostEvent extends TriggerEventBase {
+    public static final String NODE_NAME = "nodeName";
 
-    private Map<String, Object> context;
-
-    public NodeLostEvent(NodeLostTrigger source, long nodeLostNanoTime, String nodeRemoved) {
-      this.source = source;
-      this.nodeLostNanoTime = nodeLostNanoTime;
-      this.nodeName = nodeRemoved;
-    }
-
-    @Override
-    public NodeLostTrigger getSource() {
-      return source;
-    }
-
-    @Override
-    public long getEventNanoTime() {
-      return nodeLostNanoTime;
-    }
-
-    public String getNodeName() {
-      return nodeName;
-    }
-
-    public AutoScaling.EventType getType() {
-      return source.getEventType();
-    }
-
-    @Override
-    public void setContext(Map<String, Object> context) {
-      this.context = context;
-    }
-
-    @Override
-    public Map<String, Object> getContext() {
-      return context;
+    public NodeLostEvent(AutoScaling.EventType eventType, String source, long nodeLostNanoTime, String nodeRemoved) {
+      super(eventType, source, nodeLostNanoTime, Collections.singletonMap(NODE_NAME, nodeRemoved));
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/24727071/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
index eca8c0b..0db38fb 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
@@ -74,7 +74,7 @@ public class OverseerTriggerThread implements Runnable, Closeable {
   public OverseerTriggerThread(ZkController zkController) {
     this.zkController = zkController;
     zkStateReader = zkController.getZkStateReader();
-    scheduledTriggers = new ScheduledTriggers();
+    scheduledTriggers = new ScheduledTriggers(zkController.getZkClient());
     triggerFactory = new AutoScaling.TriggerFactory(zkController.getCoreContainer());
   }
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/24727071/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
index 8dfbddf..9b80f09 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
@@ -26,6 +26,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledFuture;
@@ -35,6 +36,10 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.lucene.store.AlreadyClosedException;
 import org.apache.solr.cloud.ActionThrottle;
+import org.apache.solr.cloud.DistributedQueue;
+import org.apache.solr.cloud.Overseer;
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.util.ExecutorUtil;
 import org.apache.solr.common.util.IOUtils;
 import org.apache.solr.util.DefaultSolrThreadFactory;
@@ -50,7 +55,7 @@ public class ScheduledTriggers implements Closeable {
   static final int DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS = 1;
   static final int DEFAULT_MIN_MS_BETWEEN_ACTIONS = 5000;
 
-  private final Map<String, ScheduledTrigger> scheduledTriggers = new HashMap<>();
+  private final Map<String, ScheduledTrigger> scheduledTriggers = new ConcurrentHashMap<>();
 
   /**
    * Thread pool for scheduling the triggers
@@ -70,7 +75,11 @@ public class ScheduledTriggers implements Closeable {
 
   private final ActionThrottle actionThrottle;
 
-  public ScheduledTriggers() {
+  private final SolrZkClient zkClient;
+
+  private final Overseer.Stats queueStats;
+
+  public ScheduledTriggers(SolrZkClient zkClient) {
     // todo make the core pool size configurable
     // it is important to use more than one because a time taking trigger can starve other scheduled triggers
     // ideally we should have as many core threads as the number of triggers but firstly, we don't know beforehand
@@ -83,6 +92,8 @@ public class ScheduledTriggers implements Closeable {
     actionExecutor = ExecutorUtil.newMDCAwareSingleThreadExecutor(new DefaultSolrThreadFactory("AutoscalingActionExecutor"));
     // todo make the wait time configurable
     actionThrottle = new ActionThrottle("action", DEFAULT_MIN_MS_BETWEEN_ACTIONS);
+    this.zkClient = zkClient;
+    queueStats = new Overseer.Stats();
   }
 
   /**
@@ -97,7 +108,7 @@ public class ScheduledTriggers implements Closeable {
     if (isClosed) {
       throw new AlreadyClosedException("ScheduledTriggers has been closed and cannot be used anymore");
     }
-    ScheduledTrigger scheduledTrigger = new ScheduledTrigger(newTrigger);
+    ScheduledTrigger scheduledTrigger = new ScheduledTrigger(newTrigger, zkClient, queueStats);
     ScheduledTrigger old = scheduledTriggers.putIfAbsent(newTrigger.getName(), scheduledTrigger);
     if (old != null) {
       if (old.trigger.equals(newTrigger)) {
@@ -109,38 +120,55 @@ public class ScheduledTriggers implements Closeable {
       scheduledTriggers.replace(newTrigger.getName(), scheduledTrigger);
     }
     newTrigger.setListener(event -> {
-      AutoScaling.Trigger source = event.getSource();
+      ScheduledTrigger scheduledSource = scheduledTriggers.get(event.getSource());
+      if (scheduledSource == null) {
+        log.warn("Ignoring autoscaling event " + event + " because the source trigger: " + event.getSource() + " doesn't exist.");
+        // XXX not sure what to return here...
+        return true;
+      }
+      AutoScaling.Trigger source = scheduledSource.trigger;
       if (source.isClosed()) {
-        log.warn("Ignoring autoscaling event because the source trigger: " + source + " has already been closed");
+        log.warn("Ignoring autoscaling event " + event + " because the source trigger: " + source + " has already been closed");
         // we do not want to lose this event just because the trigger were closed, perhaps a replacement will need it
         return false;
       }
-      if (hasPendingActions.compareAndSet(false, true)) {
-        List<TriggerAction> actions = source.getActions();
-        if (actions != null) {
-          actionExecutor.submit(() -> {
-            assert hasPendingActions.get();
-            try {
-              // let the action executor thread wait instead of the trigger thread so we use the throttle here
-              actionThrottle.minimumWaitBetweenActions();
-              actionThrottle.markAttemptingAction();
-              for (TriggerAction action : actions) {
-                try {
-                  action.process(event);
-                } catch (Exception e) {
-                  log.error("Error executing action: " + action.getName() + " for trigger event: " + event, e);
-                  throw e;
+      boolean replaying = event.getProperty(TriggerEventBase.REPLAYING) != null ? (Boolean)event.getProperty(TriggerEventBase.REPLAYING) : false;
+      boolean enqueued = false;
+      try {
+        if (!replaying) {
+          enqueued = scheduledTrigger.enqueue(event);
+        }
+        if (hasPendingActions.compareAndSet(false, true)) {
+          List<TriggerAction> actions = source.getActions();
+          if (actions != null) {
+            actionExecutor.submit(() -> {
+              assert hasPendingActions.get();
+              try {
+                // let the action executor thread wait instead of the trigger thread so we use the throttle here
+                actionThrottle.minimumWaitBetweenActions();
+                actionThrottle.markAttemptingAction();
+                for (TriggerAction action : actions) {
+                  try {
+                    action.process(event);
+                  } catch (Exception e) {
+                    log.error("Error executing action: " + action.getName() + " for trigger event: " + event, e);
+                    throw e;
+                  }
                 }
+              } finally {
+                hasPendingActions.set(false);
               }
-            } finally {
-              hasPendingActions.set(false);
-            }
-          });
+            });
+          }
+          return true;
+        } else {
+          // there is an action in the queue and we don't want to enqueue another until it is complete
+          return false;
+        }
+      } finally {
+        if (enqueued) {
+          scheduledTrigger.dequeue();
         }
-        return true;
-      } else {
-        // there is an action in the queue and we don't want to enqueue another until it is complete
-        return false;
       }
     });
     scheduledTrigger.scheduledFuture = scheduledThreadPoolExecutor.scheduleWithFixedDelay(scheduledTrigger, 0, DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS, TimeUnit.SECONDS);
@@ -150,12 +178,8 @@ public class ScheduledTriggers implements Closeable {
    * Removes and stops the trigger with the given name
    *
    * @param triggerName the name of the trigger to be removed
-   * @throws AlreadyClosedException if this class has already been closed
    */
   public synchronized void remove(String triggerName) {
-    if (isClosed) {
-      throw new AlreadyClosedException("ScheduledTriggers has been closed and cannot be used any more");
-    }
     ScheduledTrigger removed = scheduledTriggers.remove(triggerName);
     IOUtils.closeQuietly(removed);
   }
@@ -184,13 +208,28 @@ public class ScheduledTriggers implements Closeable {
   private class ScheduledTrigger implements Runnable, Closeable {
     AutoScaling.Trigger trigger;
     ScheduledFuture<?> scheduledFuture;
+    TriggerEventQueue queue;
 
-    ScheduledTrigger(AutoScaling.Trigger trigger) {
+    ScheduledTrigger(AutoScaling.Trigger trigger, SolrZkClient zkClient, Overseer.Stats stats) {
       this.trigger = trigger;
+      this.queue = new TriggerEventQueue(zkClient, trigger.getName(), stats);
+    }
+
+    public boolean enqueue(AutoScaling.TriggerEvent event) {
+      return queue.offerEvent(event);
+    }
+
+    public AutoScaling.TriggerEvent dequeue() {
+      return queue.pollEvent();
     }
 
     @Override
     public void run() {
+      // replay accumulated events first, if any
+      AutoScaling.TriggerEvent event;
+      while ((event = queue.pollEvent()) != null) {
+        trigger.getListener().triggerFired(event);
+      }
       // fire a trigger only if an action is not pending
       // note this is not fool proof e.g. it does not prevent an action being executed while a trigger
       // is still executing. There is additional protection against that scenario in the event listener.

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/24727071/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEventBase.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEventBase.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEventBase.java
new file mode 100644
index 0000000..2290e40
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEventBase.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud.autoscaling;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Base class for event implementations.
+ */
+public abstract class TriggerEventBase implements AutoScaling.TriggerEvent {
+  public static final String REPLAYING = "replaying";
+
+  protected final String source;
+  protected final long eventNanoTime;
+  protected final AutoScaling.EventType eventType;
+  protected final Map<String, Object> properties = new HashMap<>();
+
+  protected TriggerEventBase(AutoScaling.EventType eventType, String source, long eventNanoTime,
+                             Map<String, Object> properties) {
+    this.eventType = eventType;
+    this.source = source;
+    this.eventNanoTime = eventNanoTime;
+    if (properties != null) {
+      this.properties.putAll(properties);
+    }
+  }
+
+  @Override
+  public String getSource() {
+    return source;
+  }
+
+  @Override
+  public long getEventNanoTime() {
+    return eventNanoTime;
+  }
+
+  @Override
+  public Map<String, Object> getProperties() {
+    return properties;
+  }
+
+  @Override
+  public Object getProperty(String name) {
+    return properties.get(name);
+  }
+
+  @Override
+  public AutoScaling.EventType getEventType() {
+    return eventType;
+  }
+
+  @Override
+  public void setProperties(Map<String, Object> context) {
+    this.properties.clear();
+    if (context != null) {
+      this.properties.putAll(context);
+    }
+  }
+
+  @Override
+  public String toString() {
+    return this.getClass().getSimpleName() + "{" +
+        "source='" + source + '\'' +
+        ", eventNanoTime=" + eventNanoTime +
+        ", properties=" + properties +
+        '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/24727071/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEventQueue.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEventQueue.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEventQueue.java
new file mode 100644
index 0000000..3bf79a6
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEventQueue.java
@@ -0,0 +1,80 @@
+package org.apache.solr.cloud.autoscaling;
+
+import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.solr.cloud.DistributedQueue;
+import org.apache.solr.cloud.Overseer;
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.Utils;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public class TriggerEventQueue extends DistributedQueue {
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  public static class ReplayingEvent extends TriggerEventBase {
+
+    public ReplayingEvent(AutoScaling.EventType eventType, String source, long eventNanoTime, Map<String, Object> properties) {
+      super(eventType, source, eventNanoTime, properties);
+      this.properties.put(REPLAYING, true);
+    }
+  }
+
+  private final String triggerName;
+
+  public TriggerEventQueue(SolrZkClient zookeeper, String triggerName, Overseer.Stats stats) {
+    super(zookeeper, ZkStateReader.SOLR_AUTOSCALING_EVENTS_PATH + "/" + triggerName, stats);
+    this.triggerName = triggerName;
+  }
+
+  public boolean offerEvent(AutoScaling.TriggerEvent event) {
+    try {
+      // yuck, serializing simple beans should be supported by Utils...
+      Map<String, Object> map = new HashMap<>();
+      map.put("eventType", event.getEventType().toString());
+      map.put("source", event.getSource());
+      map.put("eventNanoTime", event.getEventNanoTime());
+      map.put("properties", event.getProperties());
+      byte[] data = Utils.toJSON(map);
+      offer(data);
+      return true;
+    } catch (KeeperException | InterruptedException e) {
+      LOG.warn("Exception adding event " + event + " to queue " + triggerName, e);
+      return false;
+    }
+  }
+
+  public AutoScaling.TriggerEvent pollEvent() {
+    byte[] data;
+    try {
+      while ((data = poll()) != null) {
+        if (data.length == 0) {
+          LOG.warn("ignoring empty data...");
+          continue;
+        }
+        try {
+          Map<String, Object> map = (Map<String, Object>) Utils.fromJSON(data);
+          String source = (String)map.get("source");
+          long eventNanoTime = ((Number)map.get("eventNanoTime")).longValue();
+          AutoScaling.EventType eventType = AutoScaling.EventType.valueOf((String)map.get("eventType"));
+          Map<String, Object> properties = (Map<String, Object>)map.get("properties");
+          return new ReplayingEvent(eventType, source, eventNanoTime, properties);
+        } catch (Exception e) {
+          LOG.warn("Invalid event data, ignoring: " + new String(data));
+          continue;
+        }
+      }
+    } catch (KeeperException | InterruptedException e) {
+      LOG.warn("Exception polling queue of trigger " + triggerName, e);
+    }
+    return null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/24727071/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeAddedTriggerTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeAddedTriggerTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeAddedTriggerTest.java
index b0405cf..f916382 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeAddedTriggerTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeAddedTriggerTest.java
@@ -84,7 +84,7 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase {
 
       NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = eventRef.get();
       assertNotNull(nodeAddedEvent);
-      assertEquals("", newNode.getNodeName(), nodeAddedEvent.getNodeName());
+      assertEquals("", newNode.getNodeName(), nodeAddedEvent.getProperty(NodeAddedTrigger.NodeAddedEvent.NODE_NAME));
     }
 
     // add a new node but remove it before the waitFor period expires
@@ -210,7 +210,7 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase {
       assertTrue(fired.get());
       NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = eventRef.get();
       assertNotNull(nodeAddedEvent);
-      assertEquals("", newNode.getNodeName(), nodeAddedEvent.getNodeName());
+      assertEquals("", newNode.getNodeName(), nodeAddedEvent.getProperty(NodeAddedTrigger.NodeAddedEvent.NODE_NAME));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/24727071/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeLostTriggerTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeLostTriggerTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeLostTriggerTest.java
index efa63d3..6d2e4a7 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeLostTriggerTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeLostTriggerTest.java
@@ -85,7 +85,7 @@ public class NodeLostTriggerTest extends SolrCloudTestCase {
 
       NodeLostTrigger.NodeLostEvent nodeLostEvent = eventRef.get();
       assertNotNull(nodeLostEvent);
-      assertEquals("", lostNodeName, nodeLostEvent.getNodeName());
+      assertEquals("", lostNodeName, nodeLostEvent.getProperty(NodeLostTrigger.NodeLostEvent.NODE_NAME));
 
     }
 
@@ -235,7 +235,7 @@ public class NodeLostTriggerTest extends SolrCloudTestCase {
 
       NodeLostTrigger.NodeLostEvent nodeLostEvent = eventRef.get();
       assertNotNull(nodeLostEvent);
-      assertEquals("", lostNodeName, nodeLostEvent.getNodeName());
+      assertEquals("", lostNodeName, nodeLostEvent.getProperty(NodeLostTrigger.NodeLostEvent.NODE_NAME));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/24727071/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
index 545c0d6..34e8587 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
@@ -202,16 +202,16 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
         if (lastActionExecutedAt.get() != 0)  {
           log.info("last action at " + lastActionExecutedAt.get() + " nano time = " + System.nanoTime());
           if (System.nanoTime() - lastActionExecutedAt.get() < TimeUnit.NANOSECONDS.convert(ScheduledTriggers.DEFAULT_MIN_MS_BETWEEN_ACTIONS - DELTA_MS, TimeUnit.MILLISECONDS)) {
-            log.info("action executed again before minimum wait time from {}", event.getSource().getName());
+            log.info("action executed again before minimum wait time from {}", event.getSource());
             fail("TriggerListener was fired before the throttling period");
           }
         }
         if (onlyOnce.compareAndSet(false, true)) {
-          log.info("action executed from {}", event.getSource().getName());
+          log.info("action executed from {}", event.getSource());
           lastActionExecutedAt.set(System.nanoTime());
           triggerFiredLatch.countDown();
         } else  {
-          log.info("action executed more than once from {}", event.getSource().getName());
+          log.info("action executed more than once from {}", event.getSource());
           fail("Trigger should not have fired more than once!");
         }
       } finally {
@@ -287,7 +287,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
     NodeLostTrigger.NodeLostEvent nodeLostEvent = (NodeLostTrigger.NodeLostEvent) eventRef.get();
     assertNotNull(nodeLostEvent);
     assertEquals("The node added trigger was fired but for a different node",
-        nodeName, nodeLostEvent.getNodeName());
+        nodeName, nodeLostEvent.getProperty(NodeLostTrigger.NodeLostEvent.NODE_NAME));
   }
 
   @Test
@@ -345,7 +345,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
     NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = (NodeAddedTrigger.NodeAddedEvent) eventRef.get();
     assertNotNull(nodeAddedEvent);
     assertEquals("The node added trigger was fired but for a different node",
-        newNode.getNodeName(), nodeAddedEvent.getNodeName());
+        newNode.getNodeName(), nodeAddedEvent.getProperty(NodeAddedTrigger.NodeAddedEvent.NODE_NAME));
   }
 
   @Test
@@ -374,7 +374,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
     NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = (NodeAddedTrigger.NodeAddedEvent) eventRef.get();
     assertNotNull(nodeAddedEvent);
     assertEquals("The node added trigger was fired but for a different node",
-        newNode.getNodeName(), nodeAddedEvent.getNodeName());
+        newNode.getNodeName(), nodeAddedEvent.getProperty(NodeAddedTrigger.NodeAddedEvent.NODE_NAME));
   }
 
   @Test
@@ -413,7 +413,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
     NodeLostTrigger.NodeLostEvent nodeLostEvent = (NodeLostTrigger.NodeLostEvent) eventRef.get();
     assertNotNull(nodeLostEvent);
     assertEquals("The node lost trigger was fired but for a different node",
-        lostNodeName, nodeLostEvent.getNodeName());
+        lostNodeName, nodeLostEvent.getProperty(NodeLostTrigger.NodeLostEvent.NODE_NAME));
   }
 
   public static class TestTriggerAction implements TriggerAction {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/24727071/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index 0bcd9ca..45995b8 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -92,6 +92,7 @@ public class ZkStateReader implements Closeable {
   public static final String REJOIN_AT_HEAD_PROP = "rejoinAtHead";
   public static final String SOLR_SECURITY_CONF_PATH = "/security.json";
   public static final String SOLR_AUTOSCALING_CONF_PATH = "/autoscaling.json";
+  public static final String SOLR_AUTOSCALING_EVENTS_PATH = "/autoscaling/events";
 
   public static final String REPLICATION_FACTOR = "replicationFactor";
   public static final String MAX_SHARDS_PER_NODE = "maxShardsPerNode";