You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2017/06/01 14:04:45 UTC
[3/3] lucene-solr:jira/solr-10745: SOLR-10745: Fix buggy element
removal in triggers. Move marker processing to trigger init(). Clean old
markers on Overseer start if there are no triggers to consume them.
SOLR-10745: Fix buggy element removal in triggers. Move marker processing to
trigger init(). Clean old markers on Overseer start if there are no triggers to
consume them.
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/b67de922
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/b67de922
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/b67de922
Branch: refs/heads/jira/solr-10745
Commit: b67de922e409d8c88b9e82655ed4a3bf0d35d4f7
Parents: c0f3f0d
Author: Andrzej Bialecki <ab...@apache.org>
Authored: Thu Jun 1 16:02:13 2017 +0200
Committer: Andrzej Bialecki <ab...@apache.org>
Committed: Thu Jun 1 16:02:13 2017 +0200
----------------------------------------------------------------------
.../org/apache/solr/cloud/ZkController.java | 12 +--
.../cloud/autoscaling/AutoScalingConfig.java | 97 ++++++++++++++++++++
.../cloud/autoscaling/NodeAddedTrigger.java | 60 +++++++-----
.../solr/cloud/autoscaling/NodeLostTrigger.java | 56 ++++++-----
.../autoscaling/OverseerTriggerThread.java | 52 ++++++++++-
.../solr/cloud/autoscaling/TriggerBase.java | 8 +-
.../solr/cloud/autoscaling/TestPolicyCloud.java | 1 -
.../autoscaling/TriggerIntegrationTest.java | 91 +++++++++++-------
8 files changed, 285 insertions(+), 92 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b67de922/solr/core/src/java/org/apache/solr/cloud/ZkController.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index a9004b8..37019ec 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -741,11 +741,9 @@ public class ZkController {
LiveNodesListener listener = (oldNodes, newNodes) -> {
oldNodes.removeAll(newNodes);
if (oldNodes.isEmpty()) { // only added nodes
- log.debug("-- skip, only new nodes: " + newNodes);
return;
}
if (isClosed) {
- log.debug("-- skip, closed: old=" + oldNodes + ", new=" + newNodes);
return;
}
// if this node is in the top three then attempt to create nodeLost message
@@ -755,21 +753,17 @@ public class ZkController {
break;
}
if (i > 2) {
- log.debug("-- skip, " + getNodeName() + " not in the top 3 of " + newNodes);
return; // this node is not in the top three
}
i++;
}
+
for (String n : oldNodes) {
String path = ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH + "/" + n;
try {
- // nocommit decide between EPHEMERAL vs. PERSISTENT, the latter needs
- // explicit cleanup on cluster restart if there are no nodeLost triggers
zkClient.create(path, null, CreateMode.PERSISTENT, true);
- log.debug("-- created " + path);
} catch (KeeperException.NodeExistsException e) {
// someone else already created this node - ignore
- log.debug("-- skip, already exists " + path);
} catch (KeeperException | InterruptedException e1) {
log.warn("Unable to register nodeLost path for " + n, e1);
}
@@ -857,8 +851,8 @@ public class ZkController {
List<Op> ops = new ArrayList<>(2);
ops.add(Op.create(nodePath, null, zkClient.getZkACLProvider().getACLsToAdd(nodePath), CreateMode.EPHEMERAL));
if (!zkClient.exists(nodeAddedPath, true)) {
- // nocommit use EPHEMERAL or PERSISTENT?
- // EPHEMERAL will disappear if this node shuts down, PERSISTENT will need an explicit cleanup
+ // use EPHEMERAL so that it disappears if this node goes down
+ // and no other action is taken
ops.add(Op.create(nodeAddedPath, null, zkClient.getZkACLProvider().getACLsToAdd(nodeAddedPath), CreateMode.EPHEMERAL));
}
zkClient.multi(ops, true);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b67de922/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingConfig.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingConfig.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingConfig.java
new file mode 100644
index 0000000..2877cb9
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingConfig.java
@@ -0,0 +1,97 @@
+package org.apache.solr.cloud.autoscaling;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+
+import org.apache.solr.common.params.AutoScalingParams;
+import org.apache.solr.common.util.Utils;
+
+/**
+ * Simple bean representation of <code>autoscaling.json</code>, which parses data
+ * lazily.
+ */
+public class AutoScalingConfig {
+
+ private final Map<String, Object> jsonMap;
+
+ private Policy policy;
+ private Map<String, TriggerConfig> triggers;
+ private Map<String, ListenerConfig> listeners;
+
+ /**
+ * Bean representation of {@link org.apache.solr.cloud.autoscaling.AutoScaling.TriggerListener} config.
+ */
+ public static class ListenerConfig {
+ public String trigger;
+ public List<String> stages;
+ public String listenerClass;
+ public List<Map<String, String>> beforeActions;
+ public List<Map<String, String>> afterActions;
+
+ public ListenerConfig(Map<String, Object> properties) {
+ trigger = (String)properties.get(AutoScalingParams.TRIGGER);
+ stages = (List<String>)properties.getOrDefault(AutoScalingParams.STAGE, Collections.emptyList());
+ listenerClass = (String)properties.get(AutoScalingParams.CLASS);
+ beforeActions = (List<Map<String, String>>)properties.getOrDefault(AutoScalingParams.BEFORE_ACTION, Collections.emptyList());
+ afterActions = (List<Map<String, String>>)properties.getOrDefault(AutoScalingParams.AFTER_ACTION, Collections.emptyList());
+ }
+ }
+
+ /**
+ * Bean representation of {@link org.apache.solr.cloud.autoscaling.AutoScaling.Trigger} config.
+ */
+ public static class TriggerConfig {
+ public final AutoScaling.EventType eventType;
+ public final Map<String, Object> properties = new HashMap<>();
+
+ public TriggerConfig(Map<String, Object> properties) {
+ String event = (String) properties.get(AutoScalingParams.EVENT);
+ this.eventType = AutoScaling.EventType.valueOf(event.toUpperCase(Locale.ROOT));
+ this.properties.putAll(properties);
+ }
+ }
+
+ public AutoScalingConfig(Map<String, Object> jsonMap) {
+ this.jsonMap = Utils.getDeepCopy(jsonMap, 10);
+ }
+
+ public Policy getPolicy() {
+ if (policy == null) {
+ policy = new Policy(jsonMap);
+ }
+ return policy;
+ }
+
+ public Map<String, TriggerConfig> getTriggerConfigs() {
+ if (triggers == null) {
+ Map<String, Object> trigMap = (Map<String, Object>)jsonMap.get("triggers");
+ if (trigMap == null) {
+ triggers = Collections.emptyMap();
+ } else {
+ triggers = new HashMap<>(trigMap.size());
+ for (Map.Entry<String, Object> entry : trigMap.entrySet()) {
+ triggers.put(entry.getKey(), new TriggerConfig((Map<String, Object>)entry.getValue()));
+ }
+ }
+ }
+ return triggers;
+ }
+
+ public Map<String, ListenerConfig> getListenerConfigs() {
+ if (listeners == null) {
+ Map<String, Object> map = (Map<String, Object>)jsonMap.get("listeners");
+ if (map == null) {
+ listeners = Collections.emptyMap();
+ } else {
+ listeners = new HashMap<>(map.size());
+ for (Map.Entry<String, Object> entry : map.entrySet()) {
+ listeners.put(entry.getKey(), new ListenerConfig((Map<String, Object>)entry.getValue()));
+ }
+ }
+ }
+ return listeners;
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b67de922/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java
index f304ba7..7a46fc7 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java
@@ -24,12 +24,12 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@@ -62,7 +62,7 @@ public class NodeAddedTrigger extends TriggerBase {
private Set<String> lastLiveNodes;
- private Map<String, Long> nodeNameVsTimeAdded = new ConcurrentHashMap<>();
+ private Map<String, Long> nodeNameVsTimeAdded = new HashMap<>();
public NodeAddedTrigger(String name, Map<String, Object> properties,
CoreContainer container) {
@@ -99,6 +99,20 @@ public class NodeAddedTrigger extends TriggerBase {
actions.get(i).init(map);
}
}
+ // pick up added nodes for which marker paths were created
+ try {
+ List<String> added = container.getZkController().getZkClient().getChildren(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH, null, true);
+ added.forEach(n -> {
+ log.debug("Adding node from marker path: {}", n);
+ nodeNameVsTimeAdded.put(n, timeSource.getTime());
+ removeNodeAddedMarker(n);
+ });
+ } catch (KeeperException.NoNodeException e) {
+ // ignore
+ } catch (KeeperException | InterruptedException e) {
+ log.warn("Exception retrieving nodeLost markers", e);
+ }
+
}
@Override
@@ -225,28 +239,13 @@ public class NodeAddedTrigger extends TriggerBase {
copyOfNew.removeAll(lastLiveNodes);
copyOfNew.forEach(n -> {
long eventTime = timeSource.getTime();
- nodeNameVsTimeAdded.put(n, eventTime);
log.debug("Tracking new node: {} at time {}", n, eventTime);
+ nodeNameVsTimeAdded.put(n, eventTime);
});
- // pick up added nodes for which marker paths were created
- try {
- List<String> lost = container.getZkController().getZkClient().getChildren(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH, null, true);
- lost.forEach(n -> {
- log.debug("Adding node from marker path: {}", n);
- nodeNameVsTimeAdded.put(n, timeSource.getTime());
- try {
- container.getZkController().getZkClient().delete(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + n, -1, true);
- } catch (KeeperException | InterruptedException e) {
- log.debug("Exception removing nodeAdded marker " + n, e);
- }
- });
- } catch (KeeperException | InterruptedException e) {
- log.warn("Exception retrieving nodeLost markers", e);
- }
-
// has enough time expired to trigger events for a node?
- for (Map.Entry<String, Long> entry : nodeNameVsTimeAdded.entrySet()) {
+ for (Iterator<Map.Entry<String, Long>> it = nodeNameVsTimeAdded.entrySet().iterator(); it.hasNext(); ) {
+ Map.Entry<String, Long> entry = it.next();
String nodeName = entry.getKey();
Long timeAdded = entry.getValue();
long now = timeSource.getTime();
@@ -257,20 +256,35 @@ public class NodeAddedTrigger extends TriggerBase {
log.debug("NodeAddedTrigger {} firing registered listener for node: {} added at time {} , now: {}", name, nodeName, timeAdded, now);
if (listener.triggerFired(new NodeAddedEvent(getEventType(), getName(), timeAdded, nodeName))) {
// remove from tracking set only if the fire was accepted
- trackingKeySet.remove(nodeName);
+ it.remove();
+ removeNodeAddedMarker(nodeName);
}
} else {
- trackingKeySet.remove(nodeName);
+ it.remove();
+ removeNodeAddedMarker(nodeName);
}
}
}
-
lastLiveNodes = new HashSet(newLiveNodes);
} catch (RuntimeException e) {
log.error("Unexpected exception in NodeAddedTrigger", e);
}
}
+ private void removeNodeAddedMarker(String nodeName) {
+ String path = ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + nodeName;
+ try {
+ if (container.getZkController().getZkClient().exists(path, true)) {
+ container.getZkController().getZkClient().delete(path, -1, true);
+ }
+ } catch (KeeperException.NoNodeException e) {
+ // ignore
+ } catch (KeeperException | InterruptedException e) {
+ log.debug("Exception removing nodeAdded marker " + nodeName, e);
+ }
+
+ }
+
@Override
public boolean isClosed() {
synchronized (this) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b67de922/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
index 6450bda..2af4cc5 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
@@ -24,12 +24,12 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@@ -62,7 +62,7 @@ public class NodeLostTrigger extends TriggerBase {
private Set<String> lastLiveNodes;
- private Map<String, Long> nodeNameVsTimeRemoved = new ConcurrentHashMap<>();
+ private Map<String, Long> nodeNameVsTimeRemoved = new HashMap<>();
public NodeLostTrigger(String name, Map<String, Object> properties,
CoreContainer container) {
@@ -98,6 +98,19 @@ public class NodeLostTrigger extends TriggerBase {
actions.get(i).init(map);
}
}
+ // pick up lost nodes for which marker paths were created
+ try {
+ List<String> lost = container.getZkController().getZkClient().getChildren(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH, null, true);
+ lost.forEach(n -> {
+ log.debug("Adding lost node from marker path: {}", n);
+ nodeNameVsTimeRemoved.put(n, timeSource.getTime());
+ removeNodeLostMarker(n);
+ });
+ } catch (KeeperException.NoNodeException e) {
+ // ignore
+ } catch (KeeperException | InterruptedException e) {
+ log.warn("Exception retrieving nodeLost markers", e);
+ }
}
@Override
@@ -227,24 +240,9 @@ public class NodeLostTrigger extends TriggerBase {
nodeNameVsTimeRemoved.put(n, timeSource.getTime());
});
- // pick up lost nodes for which marker paths were created
- try {
- List<String> lost = container.getZkController().getZkClient().getChildren(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH, null, true);
- lost.forEach(n -> {
- log.debug("Adding lost node from marker path: {}", n);
- nodeNameVsTimeRemoved.put(n, timeSource.getTime());
- try {
- container.getZkController().getZkClient().delete(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH + "/" + n, -1, true);
- } catch (KeeperException | InterruptedException e) {
- log.warn("Exception removing nodeLost marker " + n, e);
- }
- });
- } catch (KeeperException | InterruptedException e) {
- log.warn("Exception retrieving nodeLost markers", e);
- }
-
// has enough time expired to trigger events for a node?
- for (Map.Entry<String, Long> entry : nodeNameVsTimeRemoved.entrySet()) {
+ for (Iterator<Map.Entry<String, Long>> it = nodeNameVsTimeRemoved.entrySet().iterator(); it.hasNext(); ) {
+ Map.Entry<String, Long> entry = it.next();
String nodeName = entry.getKey();
Long timeRemoved = entry.getValue();
if (TimeUnit.SECONDS.convert(timeSource.getTime() - timeRemoved, TimeUnit.NANOSECONDS) >= getWaitForSecond()) {
@@ -253,20 +251,34 @@ public class NodeLostTrigger extends TriggerBase {
if (listener != null) {
log.debug("NodeLostTrigger firing registered listener");
if (listener.triggerFired(new NodeLostEvent(getEventType(), getName(), timeRemoved, nodeName))) {
- trackingKeySet.remove(nodeName);
+ it.remove();
+ removeNodeLostMarker(nodeName);
}
} else {
- trackingKeySet.remove(nodeName);
+ it.remove();
+ removeNodeLostMarker(nodeName);
}
}
}
-
lastLiveNodes = new HashSet<>(newLiveNodes);
} catch (RuntimeException e) {
log.error("Unexpected exception in NodeLostTrigger", e);
}
}
+ private void removeNodeLostMarker(String nodeName) {
+ String path = ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH + "/" + nodeName;
+ try {
+ if (container.getZkController().getZkClient().exists(path, true)) {
+ container.getZkController().getZkClient().delete(path, -1, true);
+ }
+ } catch (KeeperException.NoNodeException e) {
+ // ignore
+ } catch (KeeperException | InterruptedException e) {
+ log.warn("Exception removing nodeLost marker " + nodeName, e);
+ }
+ }
+
@Override
public boolean isClosed() {
synchronized (this) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b67de922/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
index 4a89ce7..91146b6 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
@@ -35,6 +35,7 @@ import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.cloud.ZooKeeperException;
+import org.apache.solr.common.params.AutoScalingParams;
import org.apache.solr.common.util.IOUtils;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
@@ -169,10 +170,59 @@ public class OverseerTriggerThread implements Runnable, Closeable {
scheduledTriggers.remove(managedTriggerName);
}
}
+ // check for nodeLost triggers in the current config, and if
+ // absent then clean up old nodeLost / nodeAdded markers
+ boolean cleanOldNodeLostMarkers = true;
+ boolean cleanOldNodeAddedMarkers = true;
// add new triggers and/or replace and close the replaced triggers
for (Map.Entry<String, AutoScaling.Trigger> entry : copy.entrySet()) {
+ if (entry.getValue().getEventType().equals(AutoScaling.EventType.NODELOST)) {
+ cleanOldNodeLostMarkers = false;
+ }
+ if (entry.getValue().getEventType().equals(AutoScaling.EventType.NODEADDED)) {
+ cleanOldNodeAddedMarkers = false;
+ }
scheduledTriggers.add(entry.getValue());
}
+ if (cleanOldNodeLostMarkers) {
+ log.debug("-- clean old nodeLost markers");
+ try {
+ List<String> markers = zkClient.getChildren(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH, null, true);
+ markers.forEach(n -> {
+ removeNodeMarker(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH, n);
+ });
+ } catch (KeeperException.NoNodeException e) {
+ // ignore
+ } catch (KeeperException | InterruptedException e) {
+ log.warn("Error removing old nodeLost markers", e);
+ }
+ }
+ if (cleanOldNodeAddedMarkers) {
+ log.debug("-- clean old nodeAdded markers");
+ try {
+ List<String> markers = zkClient.getChildren(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH, null, true);
+ markers.forEach(n -> {
+ removeNodeMarker(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH, n);
+ });
+ } catch (KeeperException.NoNodeException e) {
+ // ignore
+ } catch (KeeperException | InterruptedException e) {
+ log.warn("Error removing old nodeAdded markers", e);
+ }
+
+ }
+ }
+ }
+
+ private void removeNodeMarker(String path, String nodeName) {
+ path = path + "/" + nodeName;
+ try {
+ zkClient.delete(path, -1, true);
+ log.debug(" -- deleted " + path);
+ } catch (KeeperException.NoNodeException e) {
+ // ignore
+ } catch (KeeperException | InterruptedException e) {
+ log.warn("Error removing old marker " + path, e);
}
}
@@ -250,7 +300,7 @@ public class OverseerTriggerThread implements Runnable, Closeable {
for (Map.Entry<String, Object> entry : triggers.entrySet()) {
Map<String, Object> props = (Map<String, Object>) entry.getValue();
- String event = (String) props.get("event");
+ String event = (String) props.get(AutoScalingParams.EVENT);
AutoScaling.EventType eventType = AutoScaling.EventType.valueOf(event.toUpperCase(Locale.ROOT));
String triggerName = entry.getKey();
triggerMap.put(triggerName, triggerFactory.create(eventType, triggerName, props));
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b67de922/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerBase.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerBase.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerBase.java
index ef9a3cf..7aff846 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerBase.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerBase.java
@@ -97,11 +97,11 @@ public abstract class TriggerBase implements AutoScaling.Trigger {
LOG.warn("Exception getting trigger state '" + path + "'", e);
}
if (data != null) {
- Map<String, Object> state = (Map<String, Object>)Utils.fromJSON(data);
+ Map<String, Object> restoredState = (Map<String, Object>)Utils.fromJSON(data);
// make sure lastState is sorted
- state = Utils.getDeepCopy(state, 10, false, true);;
- setState(state);
- lastState = state;
+ restoredState = Utils.getDeepCopy(restoredState, 10, false, true);
+ setState(restoredState);
+ lastState = restoredState;
}
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b67de922/solr/core/src/test/org/apache/solr/cloud/autoscaling/TestPolicyCloud.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TestPolicyCloud.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TestPolicyCloud.java
index 5038278..a809873 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TestPolicyCloud.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TestPolicyCloud.java
@@ -30,7 +30,6 @@ import org.apache.solr.cloud.OverseerTaskProcessor;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.util.Utils;
import org.apache.zookeeper.KeeperException;
import org.junit.After;
import org.junit.BeforeClass;
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/b67de922/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
index b2c95b7..ae3f72d 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
@@ -139,6 +139,8 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
paths.forEach(n -> {
try {
ZKUtil.deleteRecursive(zkClient().getSolrZooKeeper(), path + "/" + n);
+ } catch (KeeperException.NoNodeException e) {
+ // ignore
} catch (KeeperException | InterruptedException e) {
log.warn("Error deleting old data", e);
}
@@ -634,10 +636,11 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
@Override
public void process(TriggerEvent event) {
+ log.info("-- event: " + event);
events.add(event);
getActionStarted().countDown();
try {
- Thread.sleep(5000);
+ Thread.sleep(eventQueueActionWait);
triggerFired.compareAndSet(false, true);
getActionCompleted().countDown();
} catch (InterruptedException e) {
@@ -658,6 +661,8 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
}
}
+ public static long eventQueueActionWait = 5000;
+
@Test
public void testEventQueue() throws Exception {
CloudSolrClient solrClient = cluster.getSolrClient();
@@ -696,19 +701,20 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
assertNotNull(nodeAddedEvent);
// but action did not complete yet so the event is still enqueued
assertFalse(triggerFired.get());
+ events.clear();
actionStarted = new CountDownLatch(1);
+ eventQueueActionWait = 1;
// kill overseer leader
cluster.stopJettySolrRunner(overseerLeaderIndex);
Thread.sleep(5000);
+ // new overseer leader should be elected and run triggers
await = actionInterrupted.await(3, TimeUnit.SECONDS);
assertTrue("action wasn't interrupted", await);
- // new overseer leader should be elected and run triggers
- newNode = cluster.startJettySolrRunner();
- // it should fire again but not complete yet
+ // it should fire again from enqueued event
await = actionStarted.await(60, TimeUnit.SECONDS);
TriggerEvent replayedEvent = events.iterator().next();
assertTrue(replayedEvent.getProperty(TriggerEventQueue.ENQUEUE_TIME) != null);
- assertTrue(replayedEvent.getProperty(TriggerEventQueue.DEQUEUE_TIME) != null);
+ assertTrue(events + "\n" + replayedEvent.toString(), replayedEvent.getProperty(TriggerEventQueue.DEQUEUE_TIME) != null);
await = actionCompleted.await(10, TimeUnit.SECONDS);
assertTrue(triggerFired.get());
}
@@ -743,6 +749,8 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
}
}
+ events.clear();
+
JettySolrRunner newNode = cluster.startJettySolrRunner();
boolean await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
assertTrue("The trigger did not fire at all", await);
@@ -797,8 +805,6 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
}
public static class TestEventMarkerAction implements TriggerAction {
- // sanity check that an action instance is only invoked once
- private final AtomicBoolean onlyOnce = new AtomicBoolean(false);
public TestEventMarkerAction() {
actionConstructorCalled.countDown();
@@ -806,7 +812,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
@Override
public String getName() {
- return "TestTriggerAction";
+ return "TestEventMarkerAction";
}
@Override
@@ -841,17 +847,17 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
@Override
public void init(Map<String, String> args) {
- log.info("TestTriggerAction init");
+ log.info("TestEventMarkerAction init");
actionInitCalled.countDown();
}
}
@Test
- public void testNodeEventsRegistration() throws Exception {
+ public void testNodeMarkersRegistration() throws Exception {
// for this test we want to create two triggers so we must assert that the actions were created twice
actionInitCalled = new CountDownLatch(2);
// similarly we want both triggers to fire
- triggerFiredLatch = new CountDownLatch(3);
+ triggerFiredLatch = new CountDownLatch(2);
TestLiveNodesListener listener = registerLiveNodesListener();
NamedList<Object> overSeerStatus = cluster.getSolrClient().request(CollectionAdminRequest.getOverseerStatus());
@@ -875,7 +881,8 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
String pathAdded = ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + node.getNodeName();
assertTrue("Path " + pathAdded + " wasn't created", zkClient().exists(pathAdded, true));
listener.reset();
- // stop overseer, which should also cause nodeLost event
+ // stop overseer
+ log.info("====== KILL OVERSEER 1");
cluster.stopJettySolrRunner(overseerLeaderIndex);
if (!listener.onChangeLatch.await(10, TimeUnit.SECONDS)) {
fail("onChange listener didn't execute on cluster change");
@@ -883,12 +890,16 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
assertEquals(1, listener.lostNodes.size());
assertEquals(overseerLeader, listener.lostNodes.iterator().next());
assertEquals(0, listener.addedNodes.size());
- // verify that a znode exists
+ // wait until the new overseer is up
+ Thread.sleep(5000);
+ // verify that a znode does NOT exist - there's no nodeLost trigger,
+ // so the new overseer cleaned up existing nodeLost markers
String pathLost = ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH + "/" + overseerLeader;
- assertTrue("Path " + pathLost + " wasn't created", zkClient().exists(pathLost, true));
+ assertFalse("Path " + pathLost + " exists", zkClient().exists(pathLost, true));
listener.reset();
// create another node
+ log.info("====== ADD NODE 1");
JettySolrRunner node1 = cluster.startJettySolrRunner();
if (!listener.onChangeLatch.await(10, TimeUnit.SECONDS)) {
fail("onChange listener didn't execute on cluster change");
@@ -902,6 +913,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
// set up triggers
CloudSolrClient solrClient = cluster.getSolrClient();
+ log.info("====== ADD TRIGGERS");
String setTriggerCommand = "{" +
"'set-trigger' : {" +
"'name' : 'node_added_trigger'," +
@@ -926,25 +938,40 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
response = solrClient.request(req);
assertEquals(response.get("result").toString(), "success");
- if (!triggerFiredLatch.await(20, TimeUnit.SECONDS)) {
- fail("Both triggers should have fired by now");
- }
- assertEquals(3, events.size());
- // 2 nodeAdded, 1 nodeLost
- int nodeAdded = 0;
- int nodeLost = 0;
- for (TriggerEvent ev : events) {
- String nodeName = (String)ev.getProperty(TriggerEvent.NODE_NAME);
- if (ev.eventType.equals(AutoScaling.EventType.NODELOST)) {
- assertEquals(overseerLeader, nodeName);
- nodeLost++;
- } else if (ev.eventType.equals(AutoScaling.EventType.NODEADDED)) {
- assertTrue(nodeName + " not one of: " + node.getNodeName() + ", " + node1.getNodeName(),
- nodeName.equals(node.getNodeName()) || nodeName.equals(node1.getNodeName()));
- nodeAdded++;
+ overSeerStatus = cluster.getSolrClient().request(CollectionAdminRequest.getOverseerStatus());
+ overseerLeader = (String) overSeerStatus.get("leader");
+ overseerLeaderIndex = 0;
+ for (int i = 0; i < cluster.getJettySolrRunners().size(); i++) {
+ JettySolrRunner jetty = cluster.getJettySolrRunner(i);
+ if (jetty.getNodeName().equals(overseerLeader)) {
+ overseerLeaderIndex = i;
+ break;
}
}
- assertEquals(1, nodeLost);
- assertEquals(2, nodeAdded);
+
+ Thread.sleep(5000);
+ // old nodeAdded markers should be consumed now by nodeAdded trigger
+ // but it doesn't result in new events because all nodes have been added
+ // before we configured the trigger
+ assertFalse("Path " + pathAdded + " should have been deleted", zkClient().exists(pathAdded, true));
+
+ listener.reset();
+ events.clear();
+ triggerFiredLatch = new CountDownLatch(1);
+ // kill overseer again
+ log.info("====== KILL OVERSEER 2");
+ cluster.stopJettySolrRunner(overseerLeaderIndex);
+ if (!listener.onChangeLatch.await(10, TimeUnit.SECONDS)) {
+ fail("onChange listener didn't execute on cluster change");
+ }
+
+
+ if (!triggerFiredLatch.await(20, TimeUnit.SECONDS)) {
+ fail("Trigger should have fired by now");
+ }
+ assertEquals(1, events.size());
+ TriggerEvent ev = events.iterator().next();
+ assertEquals(overseerLeader, ev.getProperty(TriggerEvent.NODE_NAME));
+ assertEquals(AutoScaling.EventType.NODELOST, ev.getEventType());
}
}