You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2017/06/01 14:04:45 UTC

[3/3] lucene-solr:jira/solr-10745: SOLR-10745: Fix buggy element removal in triggers. Move marker processing to trigger init(). Clean old markers on Overseer start if there are no triggers to consume them.

SOLR-10745: Fix buggy element removal in triggers. Move marker processing to
trigger init(). Clean old markers on Overseer start if there are no triggers to
consume them.


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/b67de922
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/b67de922
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/b67de922

Branch: refs/heads/jira/solr-10745
Commit: b67de922e409d8c88b9e82655ed4a3bf0d35d4f7
Parents: c0f3f0d
Author: Andrzej Bialecki <ab...@apache.org>
Authored: Thu Jun 1 16:02:13 2017 +0200
Committer: Andrzej Bialecki <ab...@apache.org>
Committed: Thu Jun 1 16:02:13 2017 +0200

----------------------------------------------------------------------
 .../org/apache/solr/cloud/ZkController.java     | 12 +--
 .../cloud/autoscaling/AutoScalingConfig.java    | 97 ++++++++++++++++++++
 .../cloud/autoscaling/NodeAddedTrigger.java     | 60 +++++++-----
 .../solr/cloud/autoscaling/NodeLostTrigger.java | 56 ++++++-----
 .../autoscaling/OverseerTriggerThread.java      | 52 ++++++++++-
 .../solr/cloud/autoscaling/TriggerBase.java     |  8 +-
 .../solr/cloud/autoscaling/TestPolicyCloud.java |  1 -
 .../autoscaling/TriggerIntegrationTest.java     | 91 +++++++++++-------
 8 files changed, 285 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b67de922/solr/core/src/java/org/apache/solr/cloud/ZkController.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index a9004b8..37019ec 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -741,11 +741,9 @@ public class ZkController {
     LiveNodesListener listener = (oldNodes, newNodes) -> {
       oldNodes.removeAll(newNodes);
       if (oldNodes.isEmpty()) { // only added nodes
-        log.debug("-- skip, only new nodes: " + newNodes);
         return;
       }
       if (isClosed) {
-        log.debug("-- skip, closed: old=" + oldNodes + ", new=" + newNodes);
         return;
       }
       // if this node is in the top three then attempt to create nodeLost message
@@ -755,21 +753,17 @@ public class ZkController {
           break;
         }
         if (i > 2) {
-          log.debug("-- skip, " + getNodeName() + " not in the top 3 of " + newNodes);
           return; // this node is not in the top three
         }
         i++;
       }
+
       for (String n : oldNodes) {
         String path = ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH + "/" + n;
         try {
-          // nocommit decide between EPHEMERAL vs. PERSISTENT, the latter needs
-          // explicit cleanup on cluster restart if there are no nodeLost triggers
           zkClient.create(path, null, CreateMode.PERSISTENT, true);
-          log.debug("-- created " + path);
         } catch (KeeperException.NodeExistsException e) {
           // someone else already created this node - ignore
-          log.debug("-- skip, already exists " + path);
         } catch (KeeperException | InterruptedException e1) {
           log.warn("Unable to register nodeLost path for " + n, e1);
         }
@@ -857,8 +851,8 @@ public class ZkController {
     List<Op> ops = new ArrayList<>(2);
     ops.add(Op.create(nodePath, null, zkClient.getZkACLProvider().getACLsToAdd(nodePath), CreateMode.EPHEMERAL));
     if (!zkClient.exists(nodeAddedPath, true)) {
-      // nocommit use EPHEMERAL or PERSISTENT?
-      // EPHEMERAL will disappear if this node shuts down, PERSISTENT will need an explicit cleanup
+      // use EPHEMERAL so that it disappears if this node goes down
+      // and no other action is taken
       ops.add(Op.create(nodeAddedPath, null, zkClient.getZkACLProvider().getACLsToAdd(nodeAddedPath), CreateMode.EPHEMERAL));
     }
     zkClient.multi(ops, true);

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b67de922/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingConfig.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingConfig.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingConfig.java
new file mode 100644
index 0000000..2877cb9
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingConfig.java
@@ -0,0 +1,97 @@
+package org.apache.solr.cloud.autoscaling;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+
+import org.apache.solr.common.params.AutoScalingParams;
+import org.apache.solr.common.util.Utils;
+
+/**
+ * Simple bean representation of <code>autoscaling.json</code>, which parses data
+ * lazily.
+ */
+public class AutoScalingConfig {
+
+  private final Map<String, Object> jsonMap;
+
+  private Policy policy;
+  private Map<String, TriggerConfig> triggers;
+  private Map<String, ListenerConfig> listeners;
+
+  /**
+   * Bean representation of {@link org.apache.solr.cloud.autoscaling.AutoScaling.TriggerListener} config.
+   */
+  public static class ListenerConfig {
+    public String trigger;
+    public List<String> stages;
+    public String listenerClass;
+    public List<Map<String, String>> beforeActions;
+    public List<Map<String, String>> afterActions;
+
+    public ListenerConfig(Map<String, Object> properties) {
+      trigger = (String)properties.get(AutoScalingParams.TRIGGER);
+      stages = (List<String>)properties.getOrDefault(AutoScalingParams.STAGE, Collections.emptyList());
+      listenerClass = (String)properties.get(AutoScalingParams.CLASS);
+      beforeActions = (List<Map<String, String>>)properties.getOrDefault(AutoScalingParams.BEFORE_ACTION, Collections.emptyList());
+      afterActions = (List<Map<String, String>>)properties.getOrDefault(AutoScalingParams.AFTER_ACTION, Collections.emptyList());
+    }
+  }
+
+  /**
+   * Bean representation of {@link org.apache.solr.cloud.autoscaling.AutoScaling.Trigger} config.
+   */
+  public static class TriggerConfig {
+    public final AutoScaling.EventType eventType;
+    public final Map<String, Object> properties = new HashMap<>();
+
+    public TriggerConfig(Map<String, Object> properties) {
+      String event = (String) properties.get(AutoScalingParams.EVENT);
+      this.eventType = AutoScaling.EventType.valueOf(event.toUpperCase(Locale.ROOT));
+      this.properties.putAll(properties);
+    }
+  }
+
+  public AutoScalingConfig(Map<String, Object> jsonMap) {
+    this.jsonMap = Utils.getDeepCopy(jsonMap, 10);
+  }
+
+  public Policy getPolicy() {
+    if (policy == null) {
+      policy = new Policy(jsonMap);
+    }
+    return policy;
+  }
+
+  public Map<String, TriggerConfig> getTriggerConfigs() {
+    if (triggers == null) {
+      Map<String, Object> trigMap = (Map<String, Object>)jsonMap.get("triggers");
+      if (trigMap == null) {
+        triggers = Collections.emptyMap();
+      } else {
+        triggers = new HashMap<>(trigMap.size());
+        for (Map.Entry<String, Object> entry : trigMap.entrySet()) {
+          triggers.put(entry.getKey(), new TriggerConfig((Map<String, Object>)entry.getValue()));
+        }
+      }
+    }
+    return triggers;
+  }
+
+  public Map<String, ListenerConfig> getListenerConfigs() {
+    if (listeners == null) {
+      Map<String, Object> map = (Map<String, Object>)jsonMap.get("listeners");
+      if (map == null) {
+        listeners = Collections.emptyMap();
+      } else {
+        listeners = new HashMap<>(map.size());
+        for (Map.Entry<String, Object> entry : map.entrySet()) {
+          listeners.put(entry.getKey(), new ListenerConfig((Map<String, Object>)entry.getValue()));
+        }
+      }
+    }
+    return listeners;
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b67de922/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java
index f304ba7..7a46fc7 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java
@@ -24,12 +24,12 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -62,7 +62,7 @@ public class NodeAddedTrigger extends TriggerBase {
 
   private Set<String> lastLiveNodes;
 
-  private Map<String, Long> nodeNameVsTimeAdded = new ConcurrentHashMap<>();
+  private Map<String, Long> nodeNameVsTimeAdded = new HashMap<>();
 
   public NodeAddedTrigger(String name, Map<String, Object> properties,
                           CoreContainer container) {
@@ -99,6 +99,20 @@ public class NodeAddedTrigger extends TriggerBase {
         actions.get(i).init(map);
       }
     }
+    // pick up added nodes for which marker paths were created
+    try {
+      List<String> added = container.getZkController().getZkClient().getChildren(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH, null, true);
+      added.forEach(n -> {
+        log.debug("Adding node from marker path: {}", n);
+        nodeNameVsTimeAdded.put(n, timeSource.getTime());
+        removeNodeAddedMarker(n);
+      });
+    } catch (KeeperException.NoNodeException e) {
+      // ignore
+    } catch (KeeperException | InterruptedException e) {
+      log.warn("Exception retrieving nodeLost markers", e);
+    }
+
   }
 
   @Override
@@ -225,28 +239,13 @@ public class NodeAddedTrigger extends TriggerBase {
       copyOfNew.removeAll(lastLiveNodes);
       copyOfNew.forEach(n -> {
         long eventTime = timeSource.getTime();
-        nodeNameVsTimeAdded.put(n, eventTime);
         log.debug("Tracking new node: {} at time {}", n, eventTime);
+        nodeNameVsTimeAdded.put(n, eventTime);
       });
 
-      // pick up added nodes for which marker paths were created
-      try {
-        List<String> lost = container.getZkController().getZkClient().getChildren(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH, null, true);
-        lost.forEach(n -> {
-          log.debug("Adding node from marker path: {}", n);
-          nodeNameVsTimeAdded.put(n, timeSource.getTime());
-          try {
-            container.getZkController().getZkClient().delete(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + n, -1, true);
-          } catch (KeeperException | InterruptedException e) {
-            log.debug("Exception removing nodeAdded marker " + n, e);
-          }
-        });
-      } catch (KeeperException | InterruptedException e) {
-        log.warn("Exception retrieving nodeLost markers", e);
-      }
-
       // has enough time expired to trigger events for a node?
-      for (Map.Entry<String, Long> entry : nodeNameVsTimeAdded.entrySet()) {
+      for (Iterator<Map.Entry<String, Long>> it = nodeNameVsTimeAdded.entrySet().iterator(); it.hasNext(); ) {
+        Map.Entry<String, Long> entry = it.next();
         String nodeName = entry.getKey();
         Long timeAdded = entry.getValue();
         long now = timeSource.getTime();
@@ -257,20 +256,35 @@ public class NodeAddedTrigger extends TriggerBase {
             log.debug("NodeAddedTrigger {} firing registered listener for node: {} added at time {} , now: {}", name, nodeName, timeAdded, now);
             if (listener.triggerFired(new NodeAddedEvent(getEventType(), getName(), timeAdded, nodeName))) {
               // remove from tracking set only if the fire was accepted
-              trackingKeySet.remove(nodeName);
+              it.remove();
+              removeNodeAddedMarker(nodeName);
             }
           } else  {
-            trackingKeySet.remove(nodeName);
+            it.remove();
+            removeNodeAddedMarker(nodeName);
           }
         }
       }
-
       lastLiveNodes = new HashSet(newLiveNodes);
     } catch (RuntimeException e) {
       log.error("Unexpected exception in NodeAddedTrigger", e);
     }
   }
 
+  private void removeNodeAddedMarker(String nodeName) {
+    String path = ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + nodeName;
+    try {
+      if (container.getZkController().getZkClient().exists(path, true)) {
+        container.getZkController().getZkClient().delete(path, -1, true);
+      }
+    } catch (KeeperException.NoNodeException e) {
+      // ignore
+    } catch (KeeperException | InterruptedException e) {
+      log.debug("Exception removing nodeAdded marker " + nodeName, e);
+    }
+
+  }
+
   @Override
   public boolean isClosed() {
     synchronized (this) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b67de922/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
index 6450bda..2af4cc5 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
@@ -24,12 +24,12 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -62,7 +62,7 @@ public class NodeLostTrigger extends TriggerBase {
 
   private Set<String> lastLiveNodes;
 
-  private Map<String, Long> nodeNameVsTimeRemoved = new ConcurrentHashMap<>();
+  private Map<String, Long> nodeNameVsTimeRemoved = new HashMap<>();
 
   public NodeLostTrigger(String name, Map<String, Object> properties,
                          CoreContainer container) {
@@ -98,6 +98,19 @@ public class NodeLostTrigger extends TriggerBase {
         actions.get(i).init(map);
       }
     }
+    // pick up lost nodes for which marker paths were created
+    try {
+      List<String> lost = container.getZkController().getZkClient().getChildren(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH, null, true);
+      lost.forEach(n -> {
+        log.debug("Adding lost node from marker path: {}", n);
+        nodeNameVsTimeRemoved.put(n, timeSource.getTime());
+        removeNodeLostMarker(n);
+      });
+    } catch (KeeperException.NoNodeException e) {
+      // ignore
+    } catch (KeeperException | InterruptedException e) {
+      log.warn("Exception retrieving nodeLost markers", e);
+    }
   }
 
   @Override
@@ -227,24 +240,9 @@ public class NodeLostTrigger extends TriggerBase {
         nodeNameVsTimeRemoved.put(n, timeSource.getTime());
       });
 
-      // pick up lost nodes for which marker paths were created
-      try {
-        List<String> lost = container.getZkController().getZkClient().getChildren(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH, null, true);
-        lost.forEach(n -> {
-          log.debug("Adding lost node from marker path: {}", n);
-          nodeNameVsTimeRemoved.put(n, timeSource.getTime());
-          try {
-            container.getZkController().getZkClient().delete(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH + "/" + n, -1, true);
-          } catch (KeeperException | InterruptedException e) {
-            log.warn("Exception removing nodeLost marker " + n, e);
-          }
-        });
-      } catch (KeeperException | InterruptedException e) {
-        log.warn("Exception retrieving nodeLost markers", e);
-      }
-
       // has enough time expired to trigger events for a node?
-      for (Map.Entry<String, Long> entry : nodeNameVsTimeRemoved.entrySet()) {
+      for (Iterator<Map.Entry<String, Long>> it = nodeNameVsTimeRemoved.entrySet().iterator(); it.hasNext(); ) {
+        Map.Entry<String, Long> entry = it.next();
         String nodeName = entry.getKey();
         Long timeRemoved = entry.getValue();
         if (TimeUnit.SECONDS.convert(timeSource.getTime() - timeRemoved, TimeUnit.NANOSECONDS) >= getWaitForSecond()) {
@@ -253,20 +251,34 @@ public class NodeLostTrigger extends TriggerBase {
           if (listener != null) {
             log.debug("NodeLostTrigger firing registered listener");
             if (listener.triggerFired(new NodeLostEvent(getEventType(), getName(), timeRemoved, nodeName)))  {
-              trackingKeySet.remove(nodeName);
+              it.remove();
+              removeNodeLostMarker(nodeName);
             }
           } else  {
-            trackingKeySet.remove(nodeName);
+            it.remove();
+            removeNodeLostMarker(nodeName);
           }
         }
       }
-
       lastLiveNodes = new HashSet<>(newLiveNodes);
     } catch (RuntimeException e) {
       log.error("Unexpected exception in NodeLostTrigger", e);
     }
   }
 
+  private void removeNodeLostMarker(String nodeName) {
+    String path = ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH + "/" + nodeName;
+    try {
+      if (container.getZkController().getZkClient().exists(path, true)) {
+        container.getZkController().getZkClient().delete(path, -1, true);
+      }
+    } catch (KeeperException.NoNodeException e) {
+      // ignore
+    } catch (KeeperException | InterruptedException e) {
+      log.warn("Exception removing nodeLost marker " + nodeName, e);
+    }
+  }
+
   @Override
   public boolean isClosed() {
     synchronized (this) {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b67de922/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
index 4a89ce7..91146b6 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
@@ -35,6 +35,7 @@ import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.cloud.ZooKeeperException;
+import org.apache.solr.common.params.AutoScalingParams;
 import org.apache.solr.common.util.IOUtils;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
@@ -169,10 +170,59 @@ public class OverseerTriggerThread implements Runnable, Closeable {
           scheduledTriggers.remove(managedTriggerName);
         }
       }
+      // check for nodeLost triggers in the current config, and if
+      // absent then clean up old nodeLost / nodeAdded markers
+      boolean cleanOldNodeLostMarkers = true;
+      boolean cleanOldNodeAddedMarkers = true;
       // add new triggers and/or replace and close the replaced triggers
       for (Map.Entry<String, AutoScaling.Trigger> entry : copy.entrySet()) {
+        if (entry.getValue().getEventType().equals(AutoScaling.EventType.NODELOST)) {
+          cleanOldNodeLostMarkers = false;
+        }
+        if (entry.getValue().getEventType().equals(AutoScaling.EventType.NODEADDED)) {
+          cleanOldNodeAddedMarkers = false;
+        }
         scheduledTriggers.add(entry.getValue());
       }
+      if (cleanOldNodeLostMarkers) {
+        log.debug("-- clean old nodeLost markers");
+        try {
+          List<String> markers = zkClient.getChildren(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH, null, true);
+          markers.forEach(n -> {
+            removeNodeMarker(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH, n);
+          });
+        } catch (KeeperException.NoNodeException e) {
+          // ignore
+        } catch (KeeperException | InterruptedException e) {
+          log.warn("Error removing old nodeLost markers", e);
+        }
+      }
+      if (cleanOldNodeAddedMarkers) {
+        log.debug("-- clean old nodeAdded markers");
+        try {
+          List<String> markers = zkClient.getChildren(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH, null, true);
+          markers.forEach(n -> {
+            removeNodeMarker(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH, n);
+          });
+        } catch (KeeperException.NoNodeException e) {
+          // ignore
+        } catch (KeeperException | InterruptedException e) {
+          log.warn("Error removing old nodeAdded markers", e);
+        }
+
+      }
+    }
+  }
+
+  private void removeNodeMarker(String path, String nodeName) {
+    path = path + "/" + nodeName;
+    try {
+      zkClient.delete(path, -1, true);
+      log.debug("  -- deleted " + path);
+    } catch (KeeperException.NoNodeException e) {
+      // ignore
+    } catch (KeeperException | InterruptedException e) {
+      log.warn("Error removing old marker " + path, e);
     }
   }
 
@@ -250,7 +300,7 @@ public class OverseerTriggerThread implements Runnable, Closeable {
 
     for (Map.Entry<String, Object> entry : triggers.entrySet()) {
       Map<String, Object> props = (Map<String, Object>) entry.getValue();
-      String event = (String) props.get("event");
+      String event = (String) props.get(AutoScalingParams.EVENT);
       AutoScaling.EventType eventType = AutoScaling.EventType.valueOf(event.toUpperCase(Locale.ROOT));
       String triggerName = entry.getKey();
       triggerMap.put(triggerName, triggerFactory.create(eventType, triggerName, props));

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b67de922/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerBase.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerBase.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerBase.java
index ef9a3cf..7aff846 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerBase.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerBase.java
@@ -97,11 +97,11 @@ public abstract class TriggerBase implements AutoScaling.Trigger {
       LOG.warn("Exception getting trigger state '" + path + "'", e);
     }
     if (data != null) {
-      Map<String, Object> state = (Map<String, Object>)Utils.fromJSON(data);
+      Map<String, Object> restoredState = (Map<String, Object>)Utils.fromJSON(data);
       // make sure lastState is sorted
-      state = Utils.getDeepCopy(state, 10, false, true);;
-      setState(state);
-      lastState = state;
+      restoredState = Utils.getDeepCopy(restoredState, 10, false, true);
+      setState(restoredState);
+      lastState = restoredState;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b67de922/solr/core/src/test/org/apache/solr/cloud/autoscaling/TestPolicyCloud.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TestPolicyCloud.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TestPolicyCloud.java
index 5038278..a809873 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TestPolicyCloud.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TestPolicyCloud.java
@@ -30,7 +30,6 @@ import org.apache.solr.cloud.OverseerTaskProcessor;
 import org.apache.solr.cloud.SolrCloudTestCase;
 import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.util.Utils;
 import org.apache.zookeeper.KeeperException;
 import org.junit.After;
 import org.junit.BeforeClass;

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b67de922/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
index b2c95b7..ae3f72d 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
@@ -139,6 +139,8 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
     paths.forEach(n -> {
       try {
         ZKUtil.deleteRecursive(zkClient().getSolrZooKeeper(), path + "/" + n);
+      } catch (KeeperException.NoNodeException e) {
+        // ignore
       } catch (KeeperException | InterruptedException e) {
         log.warn("Error deleting old data", e);
       }
@@ -634,10 +636,11 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
 
     @Override
     public void process(TriggerEvent event) {
+      log.info("-- event: " + event);
       events.add(event);
       getActionStarted().countDown();
       try {
-        Thread.sleep(5000);
+        Thread.sleep(eventQueueActionWait);
         triggerFired.compareAndSet(false, true);
         getActionCompleted().countDown();
       } catch (InterruptedException e) {
@@ -658,6 +661,8 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
     }
   }
 
+  public static long eventQueueActionWait = 5000;
+
   @Test
   public void testEventQueue() throws Exception {
     CloudSolrClient solrClient = cluster.getSolrClient();
@@ -696,19 +701,20 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
     assertNotNull(nodeAddedEvent);
     // but action did not complete yet so the event is still enqueued
     assertFalse(triggerFired.get());
+    events.clear();
     actionStarted = new CountDownLatch(1);
+    eventQueueActionWait = 1;
     // kill overseer leader
     cluster.stopJettySolrRunner(overseerLeaderIndex);
     Thread.sleep(5000);
+    // new overseer leader should be elected and run triggers
     await = actionInterrupted.await(3, TimeUnit.SECONDS);
     assertTrue("action wasn't interrupted", await);
-    // new overseer leader should be elected and run triggers
-    newNode = cluster.startJettySolrRunner();
-    // it should fire again but not complete yet
+    // it should fire again from enqueued event
     await = actionStarted.await(60, TimeUnit.SECONDS);
     TriggerEvent replayedEvent = events.iterator().next();
     assertTrue(replayedEvent.getProperty(TriggerEventQueue.ENQUEUE_TIME) != null);
-    assertTrue(replayedEvent.getProperty(TriggerEventQueue.DEQUEUE_TIME) != null);
+    assertTrue(events + "\n" + replayedEvent.toString(), replayedEvent.getProperty(TriggerEventQueue.DEQUEUE_TIME) != null);
     await = actionCompleted.await(10, TimeUnit.SECONDS);
     assertTrue(triggerFired.get());
   }
@@ -743,6 +749,8 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
       }
     }
 
+    events.clear();
+
     JettySolrRunner newNode = cluster.startJettySolrRunner();
     boolean await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
     assertTrue("The trigger did not fire at all", await);
@@ -797,8 +805,6 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
   }
 
   public static class TestEventMarkerAction implements TriggerAction {
-    // sanity check that an action instance is only invoked once
-    private final AtomicBoolean onlyOnce = new AtomicBoolean(false);
 
     public TestEventMarkerAction() {
       actionConstructorCalled.countDown();
@@ -806,7 +812,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
 
     @Override
     public String getName() {
-      return "TestTriggerAction";
+      return "TestEventMarkerAction";
     }
 
     @Override
@@ -841,17 +847,17 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
 
     @Override
     public void init(Map<String, String> args) {
-      log.info("TestTriggerAction init");
+      log.info("TestEventMarkerAction init");
       actionInitCalled.countDown();
     }
   }
 
   @Test
-  public void testNodeEventsRegistration() throws Exception {
+  public void testNodeMarkersRegistration() throws Exception {
     // for this test we want to create two triggers so we must assert that the actions were created twice
     actionInitCalled = new CountDownLatch(2);
     // similarly we want both triggers to fire
-    triggerFiredLatch = new CountDownLatch(3);
+    triggerFiredLatch = new CountDownLatch(2);
     TestLiveNodesListener listener = registerLiveNodesListener();
 
     NamedList<Object> overSeerStatus = cluster.getSolrClient().request(CollectionAdminRequest.getOverseerStatus());
@@ -875,7 +881,8 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
     String pathAdded = ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + node.getNodeName();
     assertTrue("Path " + pathAdded + " wasn't created", zkClient().exists(pathAdded, true));
     listener.reset();
-    // stop overseer, which should also cause nodeLost event
+    // stop overseer
+    log.info("====== KILL OVERSEER 1");
     cluster.stopJettySolrRunner(overseerLeaderIndex);
     if (!listener.onChangeLatch.await(10, TimeUnit.SECONDS)) {
       fail("onChange listener didn't execute on cluster change");
@@ -883,12 +890,16 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
     assertEquals(1, listener.lostNodes.size());
     assertEquals(overseerLeader, listener.lostNodes.iterator().next());
     assertEquals(0, listener.addedNodes.size());
-    // verify that a znode exists
+    // wait until the new overseer is up
+    Thread.sleep(5000);
+    // verify that a znode does NOT exist - there's no nodeLost trigger,
+    // so the new overseer cleaned up existing nodeLost markers
     String pathLost = ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH + "/" + overseerLeader;
-    assertTrue("Path " + pathLost + " wasn't created", zkClient().exists(pathLost, true));
+    assertFalse("Path " + pathLost + " exists", zkClient().exists(pathLost, true));
 
     listener.reset();
     // create another node
+    log.info("====== ADD NODE 1");
     JettySolrRunner node1 = cluster.startJettySolrRunner();
     if (!listener.onChangeLatch.await(10, TimeUnit.SECONDS)) {
       fail("onChange listener didn't execute on cluster change");
@@ -902,6 +913,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
     // set up triggers
     CloudSolrClient solrClient = cluster.getSolrClient();
 
+    log.info("====== ADD TRIGGERS");
     String setTriggerCommand = "{" +
         "'set-trigger' : {" +
         "'name' : 'node_added_trigger'," +
@@ -926,25 +938,40 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
     response = solrClient.request(req);
     assertEquals(response.get("result").toString(), "success");
 
-    if (!triggerFiredLatch.await(20, TimeUnit.SECONDS)) {
-      fail("Both triggers should have fired by now");
-    }
-    assertEquals(3, events.size());
-    // 2 nodeAdded, 1 nodeLost
-    int nodeAdded = 0;
-    int nodeLost = 0;
-    for (TriggerEvent ev : events) {
-      String nodeName = (String)ev.getProperty(TriggerEvent.NODE_NAME);
-      if (ev.eventType.equals(AutoScaling.EventType.NODELOST)) {
-        assertEquals(overseerLeader, nodeName);
-        nodeLost++;
-      } else if (ev.eventType.equals(AutoScaling.EventType.NODEADDED)) {
-        assertTrue(nodeName + " not one of: " + node.getNodeName() + ", " + node1.getNodeName(),
-            nodeName.equals(node.getNodeName()) || nodeName.equals(node1.getNodeName()));
-        nodeAdded++;
+    overSeerStatus = cluster.getSolrClient().request(CollectionAdminRequest.getOverseerStatus());
+    overseerLeader = (String) overSeerStatus.get("leader");
+    overseerLeaderIndex = 0;
+    for (int i = 0; i < cluster.getJettySolrRunners().size(); i++) {
+      JettySolrRunner jetty = cluster.getJettySolrRunner(i);
+      if (jetty.getNodeName().equals(overseerLeader)) {
+        overseerLeaderIndex = i;
+        break;
       }
     }
-    assertEquals(1, nodeLost);
-    assertEquals(2, nodeAdded);
+
+    Thread.sleep(5000);
+    // old nodeAdded markers should be consumed now by nodeAdded trigger
+    // but it doesn't result in new events because all nodes have been added
+    // before we configured the trigger
+    assertFalse("Path " + pathAdded + " should have been deleted", zkClient().exists(pathAdded, true));
+
+    listener.reset();
+    events.clear();
+    triggerFiredLatch = new CountDownLatch(1);
+    // kill overseer again
+    log.info("====== KILL OVERSEER 2");
+    cluster.stopJettySolrRunner(overseerLeaderIndex);
+    if (!listener.onChangeLatch.await(10, TimeUnit.SECONDS)) {
+      fail("onChange listener didn't execute on cluster change");
+    }
+
+
+    if (!triggerFiredLatch.await(20, TimeUnit.SECONDS)) {
+      fail("Trigger should have fired by now");
+    }
+    assertEquals(1, events.size());
+    TriggerEvent ev = events.iterator().next();
+    assertEquals(overseerLeader, ev.getProperty(TriggerEvent.NODE_NAME));
+    assertEquals(AutoScaling.EventType.NODELOST, ev.getEventType());
   }
 }