You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2017/05/23 18:00:22 UTC
[8/8] lucene-solr:jira/solr-10515: SOLR-10515 Fix enqueue/dequeue,
add javadoc and some debug logging.
SOLR-10515 Fix enqueue/dequeue, add javadoc and some debug logging.
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/39464e2c
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/39464e2c
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/39464e2c
Branch: refs/heads/jira/solr-10515
Commit: 39464e2cb8dab8ad99e7e01d7b4589e948f6ff8b
Parents: c252c55
Author: Andrzej Bialecki <ab...@apache.org>
Authored: Tue May 23 19:59:34 2017 +0200
Committer: Andrzej Bialecki <ab...@apache.org>
Committed: Tue May 23 19:59:34 2017 +0200
----------------------------------------------------------------------
.../solr/cloud/autoscaling/AutoScaling.java | 3 +-
.../cloud/autoscaling/NodeAddedTrigger.java | 16 ++++----
.../solr/cloud/autoscaling/NodeLostTrigger.java | 12 +++---
.../cloud/autoscaling/ScheduledTriggers.java | 39 +++++++++++++-------
.../solr/cloud/autoscaling/TriggerBase.java | 10 +++--
.../autoscaling/TriggerIntegrationTest.java | 23 +++++++-----
6 files changed, 62 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/39464e2c/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java
index 0d6da79..4beffd8 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java
@@ -86,7 +86,8 @@ public class AutoScaling {
* This method is executed when a trigger is ready to fire.
*
* @param event a subclass of {@link TriggerEvent}
- * @return true if the listener was ready to perform actions on the event, false otherwise.
+ * @return true if the listener was ready to perform actions on the event, false
+ * otherwise. If false was returned then callers should assume the event was discarded.
*/
boolean triggerFired(E event);
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/39464e2c/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java
index d783f86..c3bbffe 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java
@@ -29,6 +29,8 @@ import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@@ -58,7 +60,7 @@ public class NodeAddedTrigger extends TriggerBase<NodeAddedTrigger.NodeAddedEven
private Set<String> lastLiveNodes;
- private Map<String, Long> nodeNameVsTimeAdded = new HashMap<>();
+ private Map<String, Long> nodeNameVsTimeAdded = new TreeMap<>();
public NodeAddedTrigger(String name, Map<String, Object> properties,
CoreContainer container) {
@@ -78,7 +80,7 @@ public class NodeAddedTrigger extends TriggerBase<NodeAddedTrigger.NodeAddedEven
} else {
actions = Collections.emptyList();
}
- lastLiveNodes = container.getZkController().getZkStateReader().getClusterState().getLiveNodes();
+ lastLiveNodes = new TreeSet<>(container.getZkController().getZkStateReader().getClusterState().getLiveNodes());
log.debug("Initial livenodes: {}", lastLiveNodes);
this.enabled = (boolean) properties.getOrDefault("enabled", true);
this.waitForSecond = ((Long) properties.getOrDefault("waitFor", -1L)).intValue();
@@ -155,8 +157,8 @@ public class NodeAddedTrigger extends TriggerBase<NodeAddedTrigger.NodeAddedEven
if (old instanceof NodeAddedTrigger) {
NodeAddedTrigger that = (NodeAddedTrigger) old;
assert this.name.equals(that.name);
- this.lastLiveNodes = new HashSet<>(that.lastLiveNodes);
- this.nodeNameVsTimeAdded = new HashMap<>(that.nodeNameVsTimeAdded);
+ this.lastLiveNodes = new TreeSet<>(that.lastLiveNodes);
+ this.nodeNameVsTimeAdded = new TreeMap<>(that.nodeNameVsTimeAdded);
} else {
throw new SolrException(SolrException.ErrorCode.INVALID_STATE,
"Unable to restore state from an unknown type of trigger");
@@ -199,10 +201,6 @@ public class NodeAddedTrigger extends TriggerBase<NodeAddedTrigger.NodeAddedEven
ZkStateReader reader = container.getZkController().getZkStateReader();
Set<String> newLiveNodes = reader.getClusterState().getLiveNodes();
log.debug("Found livenodes: {}", newLiveNodes);
- if (lastLiveNodes == null) {
- lastLiveNodes = newLiveNodes;
- return;
- }
// have any nodes that we were tracking been removed from the cluster?
// if so, remove them from the tracking map
@@ -238,7 +236,7 @@ public class NodeAddedTrigger extends TriggerBase<NodeAddedTrigger.NodeAddedEven
}
}
- lastLiveNodes = newLiveNodes;
+ lastLiveNodes = new TreeSet(newLiveNodes);
} catch (RuntimeException e) {
log.error("Unexpected exception in NodeAddedTrigger", e);
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/39464e2c/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
index 2024a30..feea94d 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
@@ -29,6 +29,8 @@ import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@@ -58,7 +60,7 @@ public class NodeLostTrigger extends TriggerBase<NodeLostTrigger.NodeLostEvent>
private Set<String> lastLiveNodes;
- private Map<String, Long> nodeNameVsTimeRemoved = new HashMap<>();
+ private Map<String, Long> nodeNameVsTimeRemoved = new TreeMap<>();
public NodeLostTrigger(String name, Map<String, Object> properties,
CoreContainer container) {
@@ -78,7 +80,7 @@ public class NodeLostTrigger extends TriggerBase<NodeLostTrigger.NodeLostEvent>
} else {
actions = Collections.emptyList();
}
- lastLiveNodes = container.getZkController().getZkStateReader().getClusterState().getLiveNodes();
+ lastLiveNodes = new TreeSet<>(container.getZkController().getZkStateReader().getClusterState().getLiveNodes());
log.debug("Initial livenodes: {}", lastLiveNodes);
this.enabled = (boolean) properties.getOrDefault("enabled", true);
this.waitForSecond = ((Long) properties.getOrDefault("waitFor", -1L)).intValue();
@@ -154,8 +156,8 @@ public class NodeLostTrigger extends TriggerBase<NodeLostTrigger.NodeLostEvent>
if (old instanceof NodeLostTrigger) {
NodeLostTrigger that = (NodeLostTrigger) old;
assert this.name.equals(that.name);
- this.lastLiveNodes = new HashSet<>(that.lastLiveNodes);
- this.nodeNameVsTimeRemoved = new HashMap<>(that.nodeNameVsTimeRemoved);
+ this.lastLiveNodes = new TreeSet<>(that.lastLiveNodes);
+ this.nodeNameVsTimeRemoved = new TreeMap<>(that.nodeNameVsTimeRemoved);
} else {
throw new SolrException(SolrException.ErrorCode.INVALID_STATE,
"Unable to restore state from an unknown type of trigger");
@@ -230,7 +232,7 @@ public class NodeLostTrigger extends TriggerBase<NodeLostTrigger.NodeLostEvent>
}
}
- lastLiveNodes = newLiveNodes;
+ lastLiveNodes = new TreeSet<>(newLiveNodes);
} catch (RuntimeException e) {
log.error("Unexpected exception in NodeLostTrigger", e);
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/39464e2c/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
index f14bcb7..3e94d1c 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
@@ -123,23 +123,22 @@ public class ScheduledTriggers implements Closeable {
ScheduledTrigger scheduledSource = scheduledTriggers.get(event.getSource());
if (scheduledSource == null) {
log.warn("Ignoring autoscaling event " + event + " because the source trigger: " + event.getSource() + " doesn't exist.");
- // XXX not sure what to return here...
- return true;
+ return false;
}
boolean replaying = event.getProperty(TriggerEventBase.REPLAYING) != null ? (Boolean)event.getProperty(TriggerEventBase.REPLAYING) : false;
- final boolean enqueued;
- if (replaying) {
- enqueued = false;
- } else {
- enqueued = scheduledTrigger.enqueue(event);
- }
AutoScaling.Trigger source = scheduledSource.trigger;
if (source.isClosed()) {
log.warn("Ignoring autoscaling event " + event + " because the source trigger: " + source + " has already been closed");
- // we do not want to lose this event just because the trigger were closed, perhaps a replacement will need it
+ // we do not want to lose this event just because the trigger was closed, perhaps a replacement will need it
return false;
}
if (hasPendingActions.compareAndSet(false, true)) {
+ final boolean enqueued;
+ if (replaying) {
+ enqueued = false;
+ } else {
+ enqueued = scheduledTrigger.enqueue(event);
+ }
List<TriggerAction> actions = source.getActions();
if (actions != null) {
actionExecutor.submit(() -> {
@@ -165,6 +164,12 @@ public class ScheduledTriggers implements Closeable {
hasPendingActions.set(false);
}
});
+ } else {
+ if (enqueued) {
+ AutoScaling.TriggerEvent ev = scheduledTrigger.dequeue();
+ assert ev.getId().equals(event.getId());
+ }
+ hasPendingActions.set(false);
}
return true;
} else {
@@ -244,10 +249,10 @@ public class ScheduledTriggers implements Closeable {
if (isClosed) {
throw new AlreadyClosedException("ScheduledTrigger " + trigger.getName() + " has been closed.");
}
- log.debug("-- run scheduled trigger " + trigger.getName());
+ log.debug("--scheduled trigger " + trigger.getName());
// replay accumulated events on first run, if any
if (replay) {
- log.debug("--replaying...");
+ log.debug(" --replaying...");
AutoScaling.TriggerEvent event;
// peek first without removing - we may crash before calling the listener
while ((event = queue.peekEvent()) != null) {
@@ -256,17 +261,23 @@ public class ScheduledTriggers implements Closeable {
if (! trigger.getListener().triggerFired(event)) {
log.error("Failed to re-play event, discarding: " + event);
}
- log.debug("--replayed event: " + event);
+ log.debug(" --replayed event: " + event);
queue.pollEvent(); // always remove it from queue
}
// now restore saved state to possibly generate new events from old state on the first run
- trigger.restoreState();
+ try {
+ trigger.restoreState();
+ } catch (Exception e) {
+ // log but don't throw - see below
+ log.error("Error restoring trigger state " + trigger.getName(), e);
+ }
replay = false;
}
// fire a trigger only if an action is not pending
// note this is not fool proof e.g. it does not prevent an action being executed while a trigger
// is still executing. There is additional protection against that scenario in the event listener.
if (!hasPendingActions.get()) {
+ log.debug(" --run " + trigger.getName());
try {
trigger.run();
} catch (Exception e) {
@@ -277,6 +288,8 @@ public class ScheduledTriggers implements Closeable {
// checkpoint after each run
trigger.saveState();
}
+ } else {
+ log.debug(" --hasPendingActions - skipping " + trigger.getName());
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/39464e2c/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerBase.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerBase.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerBase.java
index b94824c..235cc9d 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerBase.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerBase.java
@@ -17,7 +17,9 @@
package org.apache.solr.cloud.autoscaling;
import java.lang.invoke.MethodHandles;
+import java.util.Arrays;
import java.util.Map;
+import java.util.TreeMap;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkStateReader;
@@ -63,9 +65,10 @@ public abstract class TriggerBase<E extends AutoScaling.TriggerEvent> implements
@Override
public void saveState() {
Map<String,Object> state = getState();
- byte[] data = Utils.toJSON(state);
+ TreeMap<String, Object> map = new TreeMap<>(state);
+ byte[] data = Utils.toJSON(map);
// skip saving if identical
- if (lastState != null && lastState.equals(data)) {
+ if (lastState != null && Arrays.equals(lastState, data)) {
LOG.debug("--skip saving " + getName());
return;
}
@@ -89,7 +92,6 @@ public abstract class TriggerBase<E extends AutoScaling.TriggerEvent> implements
public void restoreState() {
byte[] data = null;
String path = ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH + "/" + getName();
- LOG.info("--restore state: " + path);
try {
if (zkClient.exists(path, true)) {
data = zkClient.getData(path, null, new Stat(), true);
@@ -99,7 +101,7 @@ public abstract class TriggerBase<E extends AutoScaling.TriggerEvent> implements
}
if (data != null) {
Map<String, Object> state = (Map<String, Object>) Utils.fromJSON(data);
- LOG.info(" -- state: " + state);
+ LOG.info("-- restored state of " + path + ": " + state);
setState(state);
lastState = data;
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/39464e2c/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
index 235d748..ac22500 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
@@ -189,7 +189,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
}
}
- if (!triggerFiredLatch.await(10, TimeUnit.SECONDS)) {
+ if (!triggerFiredLatch.await(20, TimeUnit.SECONDS)) {
fail("Both triggers should have fired by now");
}
}
@@ -465,7 +465,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
// stop the overseer, somebody else will take over as the overseer
cluster.stopJettySolrRunner(index);
-
+ Thread.sleep(10000);
JettySolrRunner newNode = cluster.startJettySolrRunner();
boolean await = triggerFiredLatch.await(20, TimeUnit.SECONDS);
assertTrue("The trigger did not fire at all", await);
@@ -495,14 +495,19 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
@Override
public void process(AutoScaling.TriggerEvent event) {
- if (triggerFired.compareAndSet(false, true)) {
- eventRef.set(event);
- if (System.nanoTime() - event.getEventTime() <= TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS)) {
- fail("NodeAddedListener was fired before the configured waitFor period");
+ try {
+ if (triggerFired.compareAndSet(false, true)) {
+ eventRef.set(event);
+ if (System.nanoTime() - event.getEventTime() <= TimeUnit.NANOSECONDS.convert(waitForSeconds, TimeUnit.SECONDS)) {
+ fail("NodeAddedListener was fired before the configured waitFor period");
+ }
+ triggerFiredLatch.countDown();
+ } else {
+ fail("NodeAddedTrigger was fired more than once!");
}
- triggerFiredLatch.countDown();
- } else {
- fail("NodeAddedTrigger was fired more than once!");
+ } catch (Throwable t) {
+ log.info("--throwable", t);
+ throw t;
}
}