You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2018/12/20 12:16:58 UTC
lucene-solr:master: SOLR-13072: Management of markers for nodeLost /
nodeAdded events is broken.
Repository: lucene-solr
Updated Branches:
refs/heads/master 71f024ac8 -> 1f0e875db
SOLR-13072: Management of markers for nodeLost / nodeAdded events is broken.
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/1f0e875d
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/1f0e875d
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/1f0e875d
Branch: refs/heads/master
Commit: 1f0e875db65a0a2e9a8a62757aff1770ecf99866
Parents: 71f024a
Author: Andrzej Bialecki <ab...@apache.org>
Authored: Thu Dec 20 13:16:28 2018 +0100
Committer: Andrzej Bialecki <ab...@apache.org>
Committed: Thu Dec 20 13:16:28 2018 +0100
----------------------------------------------------------------------
solr/CHANGES.txt | 3 +
.../org/apache/solr/cloud/ZkController.java | 8 +-
.../solr/cloud/autoscaling/AutoScaling.java | 4 +
.../autoscaling/InactiveMarkersPlanAction.java | 134 +++++++++++++++++++
.../cloud/autoscaling/NodeAddedTrigger.java | 21 +--
.../solr/cloud/autoscaling/NodeLostTrigger.java | 20 +--
.../autoscaling/OverseerTriggerThread.java | 55 ++------
.../cloud/autoscaling/ScheduledTriggers.java | 5 +
.../cloud/autoscaling/NodeAddedTriggerTest.java | 9 +-
.../cloud/autoscaling/NodeLostTriggerTest.java | 13 +-
.../NodeMarkersRegistrationTest.java | 34 ++++-
.../ScheduledMaintenanceTriggerTest.java | 114 +++++++++++++++-
.../sim/SimClusterStateProvider.java | 17 ++-
13 files changed, 330 insertions(+), 107 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1f0e875d/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 9c58be6..e77b564 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -179,6 +179,9 @@ Bug Fixes
* SOLR-10975: New Admin UI Query does not URL-encode the query produced in the URL box (janhoy)
+* SOLR-13072: Management of markers for nodeLost / nodeAdded events is broken. This bug could have caused
+ some events to be lost if they coincided with an Overseer leader crash. (ab)
+
Improvements
----------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1f0e875d/solr/core/src/java/org/apache/solr/cloud/ZkController.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index b215048..a81481b 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -98,6 +98,7 @@ import org.apache.solr.common.util.IOUtils;
import org.apache.solr.common.util.ObjectReleaseTracker;
import org.apache.solr.common.util.SolrjNamedThreadFactory;
import org.apache.solr.common.util.StrUtils;
+import org.apache.solr.common.util.TimeSource;
import org.apache.solr.common.util.URLUtil;
import org.apache.solr.common.util.Utils;
import org.apache.solr.core.CloseHook;
@@ -973,10 +974,12 @@ public class ZkController implements Closeable {
log.warn("Unable to read autoscaling.json", e1);
}
if (createNodes) {
+ byte[] json = Utils.toJSON(Collections.singletonMap("timestamp", cloudManager.getTimeSource().getEpochTimeNs()));
for (String n : oldNodes) {
String path = ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH + "/" + n;
+
try {
- zkClient.create(path, null, CreateMode.PERSISTENT, true);
+ zkClient.create(path, json, CreateMode.PERSISTENT, true);
} catch (KeeperException.NodeExistsException e) {
// someone else already created this node - ignore
} catch (KeeperException | InterruptedException e1) {
@@ -1078,7 +1081,8 @@ public class ZkController implements Closeable {
if (createMarkerNode && !zkClient.exists(nodeAddedPath, true)) {
// use EPHEMERAL so that it disappears if this node goes down
// and no other action is taken
- ops.add(Op.create(nodeAddedPath, null, zkClient.getZkACLProvider().getACLsToAdd(nodeAddedPath), CreateMode.EPHEMERAL));
+ byte[] json = Utils.toJSON(Collections.singletonMap("timestamp", TimeSource.NANO_TIME.getEpochTimeNs()));
+ ops.add(Op.create(nodeAddedPath, json, zkClient.getZkACLProvider().getACLsToAdd(nodeAddedPath), CreateMode.EPHEMERAL));
}
zkClient.multi(ops, true);
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1f0e875d/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java
index 93f449a..7b2fee7 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java
@@ -229,6 +229,10 @@ public class AutoScaling {
" 'class':'solr.InactiveShardPlanAction'" +
" }," +
" {" +
+ " 'name':'inactive_markers_plan'," +
+ " 'class':'solr.InactiveMarkersPlanAction'" +
+ " }," +
+ " {" +
" 'name':'execute_plan'," +
" 'class':'solr.ExecutePlanAction'" +
" }" +
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1f0e875d/solr/core/src/java/org/apache/solr/cloud/autoscaling/InactiveMarkersPlanAction.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/InactiveMarkersPlanAction.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/InactiveMarkersPlanAction.java
new file mode 100644
index 0000000..b499d8d
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/InactiveMarkersPlanAction.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud.autoscaling;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.solr.client.solrj.cloud.DistribStateManager;
+import org.apache.solr.client.solrj.cloud.SolrCloudManager;
+import org.apache.solr.client.solrj.cloud.autoscaling.BadVersionException;
+import org.apache.solr.client.solrj.cloud.autoscaling.NotEmptyException;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.core.SolrResourceLoader;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This plan simply removes nodeAdded and nodeLost markers from Zookeeper if their TTL has
+ * expired. These markers are used by {@link NodeAddedTrigger} and {@link NodeLostTrigger} to
+ * ensure fault tolerance in case of Overseer leader crash.
+ */
+public class InactiveMarkersPlanAction extends TriggerActionBase {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ public static final String TTL_PROP = "ttl";
+
+ public static final int DEFAULT_TTL_SECONDS = 3600 * 24 * 2;
+
+ private int cleanupTTL;
+
+ public InactiveMarkersPlanAction() {
+ super();
+ TriggerUtils.validProperties(validProperties, TTL_PROP);
+ }
+
+ @Override
+ public void configure(SolrResourceLoader loader, SolrCloudManager cloudManager, Map<String, Object> properties) throws TriggerValidationException {
+ super.configure(loader, cloudManager, properties);
+ String cleanupStr = String.valueOf(properties.getOrDefault(TTL_PROP, String.valueOf(DEFAULT_TTL_SECONDS)));
+ try {
+ cleanupTTL = Integer.parseInt(cleanupStr);
+ } catch (Exception e) {
+ throw new TriggerValidationException(getName(), TTL_PROP, "invalid value '" + cleanupStr + "': " + e.toString());
+ }
+ if (cleanupTTL < 0) {
+ throw new TriggerValidationException(getName(), TTL_PROP, "invalid value '" + cleanupStr + "', should be > 0. ");
+ }
+ }
+
+ @Override
+ public void process(TriggerEvent event, ActionContext context) throws Exception {
+ log.trace("-- {} cleaning markers", getName());
+ // use epoch time to track this across JVMs and nodes
+ long currentTimeNs = cloudManager.getTimeSource().getEpochTimeNs();
+ Map<String, Object> results = new LinkedHashMap<>();
+ Set<String> cleanedUp = new TreeSet<>();
+ cleanupMarkers(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH, currentTimeNs, cleanedUp);
+ if (!cleanedUp.isEmpty()) {
+ results.put("nodeAdded", cleanedUp);
+ cleanedUp = new TreeSet<>();
+ }
+ cleanupMarkers(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH, currentTimeNs, cleanedUp);
+ if (!cleanedUp.isEmpty()) {
+ results.put("nodeLost", cleanedUp);
+ }
+ if (!results.isEmpty()) {
+ context.getProperties().put(getName(), results);
+ }
+ }
+
+ private void cleanupMarkers(String path, long currentTimeNs, Set<String> cleanedUp) throws Exception {
+ DistribStateManager stateManager = cloudManager.getDistribStateManager();
+ if (!stateManager.hasData(path)) {
+ return;
+ }
+ List<String> markers = stateManager.listData(path);
+ markers.forEach(m -> {
+ String markerPath = path + "/" + m;
+ try {
+ Map<String, Object> payload = Utils.getJson(stateManager, markerPath);
+ if (payload.isEmpty()) {
+ log.trace(" -- ignore {}: either missing or unsupported format", markerPath);
+ return;
+ }
+ long timestamp = ((Number)payload.get("timestamp")).longValue();
+ long delta = TimeUnit.NANOSECONDS.toSeconds(currentTimeNs - timestamp);
+ if (delta > cleanupTTL) {
+ try {
+ stateManager.removeData(markerPath, -1);
+ log.trace(" -- remove {}, delta={}, ttl={}", markerPath, delta, cleanupTTL);
+ cleanedUp.add(m);
+ } catch (NoSuchElementException nse) {
+ // someone already removed it - ignore
+ return;
+ } catch (BadVersionException be) {
+ throw new RuntimeException("should never happen", be);
+ } catch (NotEmptyException ne) {
+ log.error("Marker znode should be empty but it's not! Ignoring {} ({})", markerPath, ne.toString());
+ }
+ } else {
+ log.trace(" -- keep {}, delta={}, ttl={}", markerPath, delta, cleanupTTL);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return;
+ } catch (IOException | KeeperException e) {
+ log.warn("Could not cleanup marker at {}, skipping... ({}}", markerPath, e.getMessage());
+ }
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1f0e875d/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java
index fe49891..b05e8fd 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java
@@ -74,7 +74,6 @@ public class NodeAddedTrigger extends TriggerBase {
log.debug("Adding node from marker path: {}", n);
nodeNameVsTimeAdded.put(n, cloudManager.getTimeSource().getTimeNs());
}
- removeMarker(n);
});
} catch (NoSuchElementException e) {
// ignore
@@ -187,14 +186,15 @@ public class NodeAddedTrigger extends TriggerBase {
if (processor.process(new NodeAddedEvent(getEventType(), getName(), times, nodeNames, preferredOp))) {
// remove from tracking set only if the fire was accepted
nodeNames.forEach(n -> {
+ log.debug("Removing new node from tracking: {}", n);
nodeNameVsTimeAdded.remove(n);
- removeMarker(n);
});
+ } else {
+ log.debug("Processor returned false for {}!", nodeNames);
}
} else {
nodeNames.forEach(n -> {
nodeNameVsTimeAdded.remove(n);
- removeMarker(n);
});
}
}
@@ -204,21 +204,6 @@ public class NodeAddedTrigger extends TriggerBase {
}
}
- private void removeMarker(String nodeName) {
- String path = ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + nodeName;
- try {
- log.debug("NodeAddedTrigger {} - removing marker path: {}", name, path);
- if (stateManager.hasData(path)) {
- stateManager.removeData(path, -1);
- }
- } catch (NoSuchElementException e) {
- // ignore
- } catch (Exception e) {
- log.debug("Exception removing nodeAdded marker " + nodeName, e);
- }
-
- }
-
public static class NodeAddedEvent extends TriggerEvent {
public NodeAddedEvent(TriggerEventType eventType, String source, List<Long> times, List<String> nodeNames, String preferredOp) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1f0e875d/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
index 73c8ade..047db90 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
@@ -63,7 +63,7 @@ public class NodeLostTrigger extends TriggerBase {
public void init() throws Exception {
super.init();
lastLiveNodes = new HashSet<>(cloudManager.getClusterStateProvider().getLiveNodes());
- log.info("NodeLostTrigger {} - Initial livenodes: {}", name, lastLiveNodes);
+ log.debug("NodeLostTrigger {} - Initial livenodes: {}", name, lastLiveNodes);
// pick up lost nodes for which marker paths were created
try {
List<String> lost = stateManager.listData(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH);
@@ -74,7 +74,6 @@ public class NodeLostTrigger extends TriggerBase {
log.debug("Adding lost node from marker path: {}", n);
nodeNameVsTimeRemoved.put(n, cloudManager.getTimeSource().getTimeNs());
}
- removeMarker(n);
});
} catch (NoSuchElementException e) {
// ignore
@@ -160,7 +159,7 @@ public class NodeLostTrigger extends TriggerBase {
Set<String> copyOfLastLiveNodes = new HashSet<>(lastLiveNodes);
copyOfLastLiveNodes.removeAll(newLiveNodes);
copyOfLastLiveNodes.forEach(n -> {
- log.info("Tracking lost node: {}", n);
+ log.debug("Tracking lost node: {}", n);
nodeNameVsTimeRemoved.put(n, cloudManager.getTimeSource().getTimeNs());
});
@@ -187,7 +186,6 @@ public class NodeLostTrigger extends TriggerBase {
// remove from tracking set only if the fire was accepted
nodeNames.forEach(n -> {
nodeNameVsTimeRemoved.remove(n);
- removeMarker(n);
});
} else {
log.debug("NodeLostTrigger processor for lost nodes: {} is not ready, will try later", nodeNames);
@@ -195,7 +193,6 @@ public class NodeLostTrigger extends TriggerBase {
} else {
nodeNames.forEach(n -> {
nodeNameVsTimeRemoved.remove(n);
- removeMarker(n);
});
}
}
@@ -207,19 +204,6 @@ public class NodeLostTrigger extends TriggerBase {
}
}
- private void removeMarker(String nodeName) {
- String path = ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH + "/" + nodeName;
- try {
- if (stateManager.hasData(path)) {
- stateManager.removeData(path, -1);
- }
- } catch (NoSuchElementException e) {
- // ignore
- } catch (Exception e) {
- log.warn("Exception removing nodeLost marker " + nodeName, e);
- }
- }
-
public static class NodeLostEvent extends TriggerEvent {
public NodeLostEvent(TriggerEventType eventType, String source, List<Long> times, List<String> nodeNames, String preferredOp) {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1f0e875d/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
index 6288e40..7e36378 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
@@ -22,7 +22,6 @@ import java.lang.invoke.MethodHandles;
import java.net.ConnectException;
import java.util.Collections;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
@@ -31,7 +30,6 @@ import java.util.concurrent.locks.ReentrantLock;
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
import org.apache.solr.client.solrj.cloud.autoscaling.BadVersionException;
-import org.apache.solr.client.solrj.cloud.DistribStateManager;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
import org.apache.solr.common.AlreadyClosedException;
@@ -226,19 +224,11 @@ public class OverseerTriggerThread implements Runnable, SolrCloseable {
scheduledTriggers.remove(managedTriggerName);
}
}
- // check for nodeLost triggers in the current config, and if
- // absent then clean up old nodeLost / nodeAdded markers
- boolean cleanOldNodeLostMarkers = true;
- boolean cleanOldNodeAddedMarkers = true;
+ // nodeLost / nodeAdded markers are checked by triggers during their init() call
+ // which is invoked in scheduledTriggers.add(), so once this is done we can remove them
try {
// add new triggers and/or replace and close the replaced triggers
for (Map.Entry<String, AutoScaling.Trigger> entry : copy.entrySet()) {
- if (entry.getValue().getEventType().equals(TriggerEventType.NODELOST)) {
- cleanOldNodeLostMarkers = false;
- }
- if (entry.getValue().getEventType().equals(TriggerEventType.NODEADDED)) {
- cleanOldNodeAddedMarkers = false;
- }
try {
scheduledTriggers.add(entry.getValue());
} catch (AlreadyClosedException e) {
@@ -255,48 +245,19 @@ public class OverseerTriggerThread implements Runnable, SolrCloseable {
throw new IllegalStateException("Caught AlreadyClosedException from ScheduledTriggers, but we're not closed yet!", e);
}
}
- DistribStateManager stateManager = cloudManager.getDistribStateManager();
- if (cleanOldNodeLostMarkers) {
- log.debug("-- clean old nodeLost markers");
- try {
- List<String> markers = stateManager.listData(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH);
- markers.forEach(n -> {
- removeNodeMarker(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH, n);
- });
- } catch (NoSuchElementException e) {
- // ignore
- } catch (Exception e) {
- log.warn("Error removing old nodeLost markers", e);
- }
- }
- if (cleanOldNodeAddedMarkers) {
- log.debug("-- clean old nodeAdded markers");
- try {
- List<String> markers = stateManager.listData(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH);
- markers.forEach(n -> {
- removeNodeMarker(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH, n);
- });
- } catch (NoSuchElementException e) {
- // ignore
- } catch (AlreadyClosedException e) {
-
- } catch (Exception e) {
- log.warn("Error removing old nodeAdded markers", e);
- }
-
- }
+ log.debug("-- cleaning old nodeLost / nodeAdded markers");
+ removeMarkers(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH);
+ removeMarkers(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH);
}
}
- private void removeNodeMarker(String path, String nodeName) {
- path = path + "/" + nodeName;
+ private void removeMarkers(String path) {
try {
- cloudManager.getDistribStateManager().removeData(path, -1);
- log.debug(" -- deleted " + path);
+ cloudManager.getDistribStateManager().removeRecursively(path, true, false);
} catch (NoSuchElementException e) {
// ignore
} catch (Exception e) {
- log.warn("Error removing old marker " + path, e);
+ log.warn("Error removing old markers", e);
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1f0e875d/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
index b9cd9f1..cad463b 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
@@ -267,6 +267,8 @@ public class ScheduledTriggers implements Closeable {
return false;
} else {
log.debug("++++++++ Cooldown inactive - processing event: " + event);
+ // start cooldown here to immediately reject other events
+ cooldownStart.set(cloudManager.getTimeSource().getTimeNs());
}
if (hasPendingActions.compareAndSet(false, true)) {
// pause all triggers while we execute actions so triggers do not operate on a cluster in transition
@@ -286,6 +288,7 @@ public class ScheduledTriggers implements Closeable {
String msg = String.format(Locale.ROOT, "Ignoring autoscaling event %s from trigger %s because the executor has already been closed", event.toString(), source);
triggerListeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.ABORTED, msg);
log.warn(msg);
+ hasPendingActions.set(false);
// we do not want to lose this event just because the trigger was closed, perhaps a replacement will need it
return false;
}
@@ -325,6 +328,7 @@ public class ScheduledTriggers implements Closeable {
triggerListeners1.fireListeners(event.getSource(), event, TriggerEventProcessorStage.FAILED);
log.warn("Unhandled exception executing actions", e);
} finally {
+ // update cooldown to the time when we actually finished processing the actions
cooldownStart.set(cloudManager.getTimeSource().getTimeNs());
hasPendingActions.set(false);
// resume triggers after cool down period
@@ -348,6 +352,7 @@ public class ScheduledTriggers implements Closeable {
}
return true;
} else {
+ log.debug("Ignoring event {}, already processing other actions.", event.id);
// there is an action in the queue and we don't want to enqueue another until it is complete
triggerListeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.IGNORED, "Already processing another event.");
return false;
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1f0e875d/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeAddedTriggerTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeAddedTriggerTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeAddedTriggerTest.java
index de4af4b..a2b820f 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeAddedTriggerTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeAddedTriggerTest.java
@@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.SolrResourceLoader;
import org.junit.After;
@@ -78,7 +79,7 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase {
long waitForSeconds = 1 + random().nextInt(5);
Map<String, Object> props = createTriggerProps(waitForSeconds);
- try (NodeAddedTrigger trigger = new NodeAddedTrigger("node_added_trigger")) {
+ try (NodeAddedTrigger trigger = new NodeAddedTrigger("node_added_trigger1")) {
final SolrCloudManager cloudManager = container.getZkController().getSolrCloudManager();
trigger.configure(container.getResourceLoader(), cloudManager, props);
trigger.init();
@@ -122,9 +123,13 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase {
assertTrue(nodeNames.contains(newNode2.getNodeName()));
}
+ // clean nodeAdded markers - normally done by OverseerTriggerThread
+ container.getZkController().getSolrCloudManager().getDistribStateManager()
+ .removeRecursively(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH, true, false);
+
// add a new node but remove it before the waitFor period expires
// and assert that the trigger doesn't fire at all
- try (NodeAddedTrigger trigger = new NodeAddedTrigger("node_added_trigger")) {
+ try (NodeAddedTrigger trigger = new NodeAddedTrigger("node_added_trigger2")) {
final SolrCloudManager cloudManager = container.getZkController().getSolrCloudManager();
trigger.configure(container.getResourceLoader(), cloudManager, props);
trigger.init();
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1f0e875d/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeLostTriggerTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeLostTriggerTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeLostTriggerTest.java
index c974de0..bf55a85ac 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeLostTriggerTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeLostTriggerTest.java
@@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.TimeSource;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.SolrResourceLoader;
@@ -78,7 +79,7 @@ public class NodeLostTriggerTest extends SolrCloudTestCase {
long waitForSeconds = 1 + random().nextInt(5);
Map<String, Object> props = createTriggerProps(waitForSeconds);
- try (NodeLostTrigger trigger = new NodeLostTrigger("node_lost_trigger")) {
+ try (NodeLostTrigger trigger = new NodeLostTrigger("node_lost_trigger1")) {
final SolrCloudManager cloudManager = container.getZkController().getSolrCloudManager();
trigger.configure(container.getResourceLoader(), cloudManager, props);
trigger.init();
@@ -125,9 +126,13 @@ public class NodeLostTriggerTest extends SolrCloudTestCase {
}
+ // clean nodeLost markers - normally done by OverseerTriggerThread
+ container.getZkController().getSolrCloudManager().getDistribStateManager()
+ .removeRecursively(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH, true, false);
+
// remove a node but add it back before the waitFor period expires
// and assert that the trigger doesn't fire at all
- try (NodeLostTrigger trigger = new NodeLostTrigger("node_lost_trigger")) {
+ try (NodeLostTrigger trigger = new NodeLostTrigger("node_lost_trigger2")) {
final SolrCloudManager cloudManager = container.getZkController().getSolrCloudManager();
trigger.configure(container.getResourceLoader(), cloudManager, props);
final long waitTime = 2;
@@ -140,8 +145,10 @@ public class NodeLostTriggerTest extends SolrCloudTestCase {
String lostNodeName = lostNode.getNodeName();
lostNode.stop();
AtomicBoolean fired = new AtomicBoolean(false);
+ AtomicReference<TriggerEvent> eventRef = new AtomicReference<>();
trigger.setProcessor(event -> {
if (fired.compareAndSet(false, true)) {
+ eventRef.set(event);
long currentTimeNanos = cloudManager.getTimeSource().getTimeNs();
long eventTimeNanos = event.getEventTime();
long waitForNanos = TimeUnit.NANOSECONDS.convert(waitTime, TimeUnit.SECONDS) - WAIT_FOR_DELTA_NANOS;
@@ -175,7 +182,7 @@ public class NodeLostTriggerTest extends SolrCloudTestCase {
} while (true);
// ensure the event was not fired
- assertFalse(fired.get());
+ assertFalse("event was fired: " + eventRef.get(), fired.get());
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1f0e875d/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeMarkersRegistrationTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeMarkersRegistrationTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeMarkersRegistrationTest.java
index 5fe344d..9d09ac1 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeMarkersRegistrationTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeMarkersRegistrationTest.java
@@ -100,18 +100,19 @@ public class NodeMarkersRegistrationTest extends SolrCloudTestCase {
break;
}
}
- // add a node
+ // add a nodes
JettySolrRunner node = cluster.startJettySolrRunner();
cluster.waitForAllNodes(30);
if (!listener.onChangeLatch.await(10, TimeUnit.SECONDS)) {
fail("onChange listener didn't execute on cluster change");
}
assertEquals(1, listener.addedNodes.size());
- assertEquals(node.getNodeName(), listener.addedNodes.iterator().next());
+ assertTrue(listener.addedNodes.toString(), listener.addedNodes.contains(node.getNodeName()));
// verify that a znode doesn't exist (no trigger)
String pathAdded = ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + node.getNodeName();
assertFalse("Path " + pathAdded + " was created but there are no nodeAdded triggers", zkClient().exists(pathAdded, true));
listener.reset();
+
// stop overseer
log.info("====== KILL OVERSEER 1");
JettySolrRunner j = cluster.stopJettySolrRunner(overseerLeaderIndex);
@@ -145,8 +146,7 @@ public class NodeMarkersRegistrationTest extends SolrCloudTestCase {
// okay
}
- // verify that a znode does NOT exist - there's no nodeLost trigger,
- // so the new overseer cleaned up existing nodeLost markers
+ // verify that a znode does NOT exist - the new overseer cleaned up existing nodeLost markers
assertFalse("Path " + pathLost + " exists", zkClient().exists(pathLost, true));
listener.reset();
@@ -218,9 +218,29 @@ public class NodeMarkersRegistrationTest extends SolrCloudTestCase {
listenerEventLatch.countDown(); // let the trigger thread continue
assertTrue(triggerFiredLatch.await(10, TimeUnit.SECONDS));
- Thread.sleep(5000);
- // nodeAdded marker should be consumed now by nodeAdded trigger
- assertFalse("Path " + pathAdded + " should have been deleted", zkClient().exists(pathAdded, true));
+
+ // kill this node
+ listener.reset();
+ events.clear();
+ triggerFiredLatch = new CountDownLatch(1);
+
+ String node1Name = node1.getNodeName();
+ cluster.stopJettySolrRunner(node1);
+ if (!listener.onChangeLatch.await(10, TimeUnit.SECONDS)) {
+ fail("onChange listener didn't execute on cluster change");
+ }
+ assertEquals(1, listener.lostNodes.size());
+ assertEquals(node1Name, listener.lostNodes.iterator().next());
+ // verify that a znode exists
+ String pathLost2 = ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH + "/" + node1Name;
+ assertTrue("Path " + pathLost2 + " wasn't created", zkClient().exists(pathLost2, true));
+
+ listenerEventLatch.countDown(); // let the trigger thread continue
+
+ assertTrue(triggerFiredLatch.await(10, TimeUnit.SECONDS));
+
+ // triggers don't remove markers
+ assertTrue("Path " + pathLost2 + " should still exist", zkClient().exists(pathLost2, true));
listener.reset();
events.clear();
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1f0e875d/solr/core/src/test/org/apache/solr/cloud/autoscaling/ScheduledMaintenanceTriggerTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/ScheduledMaintenanceTriggerTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/ScheduledMaintenanceTriggerTest.java
index 2fe0fa4..53416a0 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/ScheduledMaintenanceTriggerTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/ScheduledMaintenanceTriggerTest.java
@@ -28,9 +28,11 @@ import java.util.concurrent.TimeUnit;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.cloud.DistribStateManager;
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventProcessorStage;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.cloud.CloudTestUtils;
import org.apache.solr.cloud.CloudTestUtils.AutoScalingRequest;
@@ -38,6 +40,7 @@ import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.cloud.autoscaling.sim.SimCloudManager;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.AutoScalingParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.TimeSource;
import org.apache.solr.common.util.Utils;
@@ -83,10 +86,45 @@ public class ScheduledMaintenanceTriggerTest extends SolrCloudTestCase {
}
@Before
- public void initTest() {
+ public void initTest() throws Exception {
+ // disable .scheduled_maintenance
+ String suspendTriggerCommand = "{" +
+ "'suspend-trigger' : {'name' : '.scheduled_maintenance'}" +
+ "}";
+ SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, suspendTriggerCommand);
+ NamedList<Object> response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+ String setPropertiesCommand = "{" +
+ "'set-properties' : {" +
+ "'" + AutoScalingParams.TRIGGER_COOLDOWN_PERIOD_SECONDS + "': 1" +
+ "}" +
+ "}";
+ response = solrClient.request(createAutoScalingRequest(SolrRequest.METHOD.POST, setPropertiesCommand));
+ assertEquals(response.get("result").toString(), "success");
triggerFired = new CountDownLatch(1);
}
+ private String addNode() throws Exception {
+ if (cloudManager instanceof SimCloudManager) {
+ return ((SimCloudManager) cloudManager).simAddNode();
+ } else {
+ return cluster.startJettySolrRunner().getNodeName();
+ }
+ }
+
+ private void stopNode(String nodeName) throws Exception {
+ if (cloudManager instanceof SimCloudManager) {
+ ((SimCloudManager) cloudManager).simRemoveNode(nodeName, true);
+ } else {
+ for (JettySolrRunner jetty : cluster.getJettySolrRunners()) {
+ if (jetty.getNodeName().equals(nodeName)) {
+ cluster.stopJettySolrRunner(jetty);
+ break;
+ }
+ }
+ }
+ }
+
@After
public void restoreDefaults() throws Exception {
SolrRequest req = AutoScalingRequest.create(SolrRequest.METHOD.POST,
@@ -118,9 +156,10 @@ public class ScheduledMaintenanceTriggerTest extends SolrCloudTestCase {
log.info(autoScalingConfig.toString());
AutoScalingConfig.TriggerConfig triggerConfig = autoScalingConfig.getTriggerConfigs().get(AutoScaling.SCHEDULED_MAINTENANCE_TRIGGER_NAME);
assertNotNull(triggerConfig);
- assertEquals(2, triggerConfig.actions.size());
+ assertEquals(3, triggerConfig.actions.size());
assertTrue(triggerConfig.actions.get(0).actionClass.endsWith(InactiveShardPlanAction.class.getSimpleName()));
- assertTrue(triggerConfig.actions.get(1).actionClass.endsWith(ExecutePlanAction.class.getSimpleName()));
+ assertTrue(triggerConfig.actions.get(1).actionClass.endsWith(InactiveMarkersPlanAction.class.getSimpleName()));
+ assertTrue(triggerConfig.actions.get(2).actionClass.endsWith(ExecutePlanAction.class.getSimpleName()));
AutoScalingConfig.TriggerListenerConfig listenerConfig = autoScalingConfig.getTriggerListenerConfigs().get(AutoScaling.SCHEDULED_MAINTENANCE_TRIGGER_NAME + ".system");
assertNotNull(listenerConfig);
assertEquals(AutoScaling.SCHEDULED_MAINTENANCE_TRIGGER_NAME, listenerConfig.trigger);
@@ -153,7 +192,7 @@ public class ScheduledMaintenanceTriggerTest extends SolrCloudTestCase {
@Override
public void process(TriggerEvent event, ActionContext context) throws Exception {
- if (context.getProperties().containsKey("inactive_shard_plan")) {
+ if (context.getProperties().containsKey("inactive_shard_plan") || context.getProperties().containsKey("inactive_markers_plan")) {
triggerFired.countDown();
}
}
@@ -201,7 +240,7 @@ public class ScheduledMaintenanceTriggerTest extends SolrCloudTestCase {
"'name' : '" + AutoScaling.SCHEDULED_MAINTENANCE_TRIGGER_NAME + "'," +
"'event' : 'scheduled'," +
"'startTime' : 'NOW+10SECONDS'," +
- "'every' : '+2SECONDS'," +
+ "'every' : '+2SECONDS'," + // must be longer than the cooldown period
"'enabled' : true," +
"'actions' : [{'name' : 'inactive_shard_plan', 'class' : 'solr.InactiveShardPlanAction', 'ttl' : '20'}," +
"{'name' : 'execute_plan', 'class' : '" + ExecutePlanAction.class.getName() + "'}," +
@@ -272,4 +311,69 @@ public class ScheduledMaintenanceTriggerTest extends SolrCloudTestCase {
CloudTestUtils.clusterShape(2, 1).matches(state.getLiveNodes(), state.getCollection(collection1));
}
+
+ public static CountDownLatch getTriggerFired() {
+ return triggerFired;
+ }
+
+ public static class TestTriggerAction2 extends TriggerActionBase {
+
+ @Override
+ public void process(TriggerEvent event, ActionContext context) throws Exception {
+ getTriggerFired().countDown();
+ }
+ }
+
+
+ @Test
+ public void testInactiveMarkersCleanup() throws Exception {
+ triggerFired = new CountDownLatch(1);
+ String setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : 'trigger1'," +
+ "'event' : 'nodeAdded'," +
+ "'waitFor': '1s'" +
+ "'enabled' : true," +
+ "'actions' : [" +
+ "{'name' : 'test', 'class' : '" + TestTriggerAction2.class.getName() + "'}]" +
+ "}}";
+ SolrRequest req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+ NamedList<Object> response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ setTriggerCommand = "{" +
+ "'set-trigger' : {" +
+ "'name' : '" + AutoScaling.SCHEDULED_MAINTENANCE_TRIGGER_NAME + "'," +
+ "'event' : 'scheduled'," +
+ "'startTime' : 'NOW+20SECONDS'," +
+ "'every' : '+2SECONDS'," + // must be longer than the cooldown period!!
+ "'enabled' : true," +
+ "'actions' : [{'name' : 'inactive_markers_plan', 'class' : 'solr.InactiveMarkersPlanAction', 'ttl' : '20'}," +
+ "{'name' : 'test', 'class' : '" + TestTriggerAction.class.getName() + "'}]" +
+ "}}";
+
+ req = createAutoScalingRequest(SolrRequest.METHOD.POST, setTriggerCommand);
+ response = solrClient.request(req);
+ assertEquals(response.get("result").toString(), "success");
+
+ cloudManager.getTimeSource().sleep(5000);
+
+ triggerFired = new CountDownLatch(1);
+ String node = addNode();
+
+ boolean await = triggerFired.await(30, TimeUnit.SECONDS);
+ assertTrue("trigger should have fired", await);
+
+ triggerFired = new CountDownLatch(1);
+
+ // should have a marker
+ DistribStateManager stateManager = cloudManager.getDistribStateManager();
+ String nodeAddedPath = ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + node;
+ assertTrue("marker for nodeAdded doesn't exist", stateManager.hasData(nodeAddedPath));
+
+ // wait for the cleanup to fire
+ await = triggerFired.await(90, TimeUnit.SECONDS);
+ assertTrue("cleanup trigger should have fired", await);
+ assertFalse("marker for nodeAdded still exists", stateManager.hasData(nodeAddedPath));
+ }
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1f0e875d/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java
index fd6c955..757c0bd 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/sim/SimClusterStateProvider.java
@@ -285,10 +285,10 @@ public class SimClusterStateProvider implements ClusterStateProvider {
if (liveNodes.contains(nodeId)) {
throw new Exception("Node " + nodeId + " already exists");
}
- liveNodes.add(nodeId);
createEphemeralLiveNode(nodeId);
- updateOverseerLeader();
nodeReplicaMap.putIfAbsent(nodeId, new ArrayList<>());
+ liveNodes.add(nodeId);
+ updateOverseerLeader();
}
/**
@@ -310,12 +310,16 @@ public class SimClusterStateProvider implements ClusterStateProvider {
}
// remove ephemeral nodes
stateManager.getRoot().removeEphemeralChildren(nodeId);
- updateOverseerLeader();
// create a nodeLost marker if needed
AutoScalingConfig cfg = stateManager.getAutoScalingConfig(null);
if (cfg.hasTriggerForEvents(TriggerEventType.NODELOST)) {
- stateManager.makePath(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH + "/" + nodeId);
+ String path = ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH + "/" + nodeId;
+ byte[] json = Utils.toJSON(Collections.singletonMap("timestamp", cloudManager.getTimeSource().getEpochTimeNs()));
+ stateManager.makePath(path,
+ json, CreateMode.PERSISTENT, false);
+ log.debug(" -- created marker: {}", path);
}
+ updateOverseerLeader();
if (!collections.isEmpty()) {
simRunLeaderElection(collections, true);
}
@@ -388,7 +392,10 @@ public class SimClusterStateProvider implements ClusterStateProvider {
mgr.makePath(ZkStateReader.LIVE_NODES_ZKNODE + "/" + nodeId, null, CreateMode.EPHEMERAL, true);
AutoScalingConfig cfg = stateManager.getAutoScalingConfig(null);
if (cfg.hasTriggerForEvents(TriggerEventType.NODEADDED)) {
- mgr.makePath(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + nodeId, null, CreateMode.EPHEMERAL, true);
+ byte[] json = Utils.toJSON(Collections.singletonMap("timestamp", cloudManager.getTimeSource().getEpochTimeNs()));
+ String path = ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + nodeId;
+ log.debug("-- creating marker: {}", path);
+ mgr.makePath(path, json, CreateMode.EPHEMERAL, true);
}
}