You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2017/05/16 10:27:13 UTC
lucene-solr:jira/solr-10515: SOLR-10515: Initial support for
per-trigger ZK queues for events.
Repository: lucene-solr
Updated Branches:
refs/heads/jira/solr-10515 [created] 247270713
SOLR-10515: Initial support for per-trigger ZK queues for events.
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/24727071
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/24727071
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/24727071
Branch: refs/heads/jira/solr-10515
Commit: 247270713cce25dacfa07799b61774d427de8b67
Parents: 607184c
Author: Andrzej Bialecki <ab...@apache.org>
Authored: Tue May 16 12:26:23 2017 +0200
Committer: Andrzej Bialecki <ab...@apache.org>
Committed: Tue May 16 12:26:23 2017 +0200
----------------------------------------------------------------------
.../solr/cloud/autoscaling/AutoScaling.java | 41 ++++----
.../cloud/autoscaling/NodeAddedTrigger.java | 44 +-------
.../solr/cloud/autoscaling/NodeLostTrigger.java | 44 +-------
.../autoscaling/OverseerTriggerThread.java | 2 +-
.../cloud/autoscaling/ScheduledTriggers.java | 105 +++++++++++++------
.../cloud/autoscaling/TriggerEventBase.java | 84 +++++++++++++++
.../cloud/autoscaling/TriggerEventQueue.java | 80 ++++++++++++++
.../cloud/autoscaling/NodeAddedTriggerTest.java | 4 +-
.../cloud/autoscaling/NodeLostTriggerTest.java | 4 +-
.../autoscaling/TriggerIntegrationTest.java | 14 +--
.../apache/solr/common/cloud/ZkStateReader.java | 1 +
11 files changed, 282 insertions(+), 141 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/24727071/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java
index cd08ea9..0b6aca3 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java
@@ -19,6 +19,7 @@ package org.apache.solr.cloud.autoscaling;
import java.io.Closeable;
import java.io.IOException;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -47,24 +48,28 @@ public class AutoScaling {
AFTER_ACTION
}
- public static interface TriggerEvent<T extends Trigger> {
- public T getSource();
+ public interface TriggerEvent {
+ EventType getEventType();
- public long getEventNanoTime();
+ String getSource();
- public void setContext(Map<String, Object> context);
+ long getEventNanoTime();
- public Map<String, Object> getContext();
+ void setProperties(Map<String, Object> properties);
+
+ Map<String, Object> getProperties();
+
+ Object getProperty(String name);
}
- public static interface TriggerListener<E extends TriggerEvent<? extends Trigger>> {
+ public interface TriggerListener<E extends TriggerEvent> {
/**
* This method is executed when a trigger is ready to fire.
*
* @param event a subclass of {@link TriggerEvent}
* @return true if the listener was ready to perform actions on the event, false otherwise.
*/
- public boolean triggerFired(E event);
+ boolean triggerFired(E event);
}
public static class HttpCallbackListener implements TriggerListener {
@@ -94,26 +99,26 @@ public class AutoScaling {
*
* @param <E> the {@link TriggerEvent} which is handled by this Trigger
*/
- public static interface Trigger<E extends TriggerEvent<? extends Trigger>> extends Closeable, Runnable {
- public String getName();
+ public interface Trigger<E extends TriggerEvent> extends Closeable, Runnable {
+ String getName();
- public EventType getEventType();
+ EventType getEventType();
- public boolean isEnabled();
+ boolean isEnabled();
- public Map<String, Object> getProperties();
+ Map<String, Object> getProperties();
- public int getWaitForSecond();
+ int getWaitForSecond();
- public List<TriggerAction> getActions();
+ List<TriggerAction> getActions();
- public void setListener(TriggerListener<E> listener);
+ void setListener(TriggerListener<E> listener);
- public TriggerListener<E> getListener();
+ TriggerListener<E> getListener();
- public boolean isClosed();
+ boolean isClosed();
- public void restoreState(Trigger<E> old);
+ void restoreState(Trigger<E> old);
}
public static class TriggerFactory implements Closeable {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/24727071/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java
index 5a340be..86cc13e 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java
@@ -202,7 +202,7 @@ public class NodeAddedTrigger implements AutoScaling.Trigger<NodeAddedTrigger.No
AutoScaling.TriggerListener<NodeAddedEvent> listener = listenerRef.get();
if (listener != null) {
log.info("NodeAddedTrigger {} firing registered listener for node: {} added at {} nanotime, now: {} nanotime", name, nodeName, timeAdded, now);
- if (listener.triggerFired(new NodeAddedEvent(this, timeAdded, nodeName))) {
+ if (listener.triggerFired(new NodeAddedEvent(getEventType(), getName(), timeAdded, nodeName))) {
// remove from tracking set only if the fire was accepted
trackingKeySet.remove(nodeName);
}
@@ -225,45 +225,11 @@ public class NodeAddedTrigger implements AutoScaling.Trigger<NodeAddedTrigger.No
}
}
- public static class NodeAddedEvent implements AutoScaling.TriggerEvent<NodeAddedTrigger> {
- private final NodeAddedTrigger source;
- private final long nodeAddedNanoTime;
- private final String nodeName;
+ public static class NodeAddedEvent extends TriggerEventBase {
+ public static final String NODE_NAME = "nodeName";
- private Map<String, Object> context;
-
- public NodeAddedEvent(NodeAddedTrigger source, long nodeAddedNanoTime, String nodeAdded) {
- this.source = source;
- this.nodeAddedNanoTime = nodeAddedNanoTime;
- this.nodeName = nodeAdded;
- }
-
- @Override
- public NodeAddedTrigger getSource() {
- return source;
- }
-
- @Override
- public long getEventNanoTime() {
- return nodeAddedNanoTime;
- }
-
- public String getNodeName() {
- return nodeName;
- }
-
- public AutoScaling.EventType getType() {
- return source.getEventType();
- }
-
- @Override
- public void setContext(Map<String, Object> context) {
- this.context = context;
- }
-
- @Override
- public Map<String, Object> getContext() {
- return context;
+ public NodeAddedEvent(AutoScaling.EventType eventType, String source, long nodeAddedNanoTime, String nodeAdded) {
+ super(eventType, source, nodeAddedNanoTime, Collections.singletonMap(NODE_NAME, nodeAdded));
}
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/24727071/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
index 385eca5..35c94ac 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
@@ -197,7 +197,7 @@ public class NodeLostTrigger implements AutoScaling.Trigger<NodeLostTrigger.Node
AutoScaling.TriggerListener<NodeLostEvent> listener = listenerRef.get();
if (listener != null) {
log.info("NodeLostTrigger firing registered listener");
- if (listener.triggerFired(new NodeLostEvent(this, timeRemoved, nodeName))) {
+ if (listener.triggerFired(new NodeLostEvent(getEventType(), getName(), timeRemoved, nodeName))) {
trackingKeySet.remove(nodeName);
}
} else {
@@ -219,45 +219,11 @@ public class NodeLostTrigger implements AutoScaling.Trigger<NodeLostTrigger.Node
}
}
- public static class NodeLostEvent implements AutoScaling.TriggerEvent<NodeLostTrigger> {
- private final NodeLostTrigger source;
- private final long nodeLostNanoTime;
- private final String nodeName;
+ public static class NodeLostEvent extends TriggerEventBase {
+ public static final String NODE_NAME = "nodeName";
- private Map<String, Object> context;
-
- public NodeLostEvent(NodeLostTrigger source, long nodeLostNanoTime, String nodeRemoved) {
- this.source = source;
- this.nodeLostNanoTime = nodeLostNanoTime;
- this.nodeName = nodeRemoved;
- }
-
- @Override
- public NodeLostTrigger getSource() {
- return source;
- }
-
- @Override
- public long getEventNanoTime() {
- return nodeLostNanoTime;
- }
-
- public String getNodeName() {
- return nodeName;
- }
-
- public AutoScaling.EventType getType() {
- return source.getEventType();
- }
-
- @Override
- public void setContext(Map<String, Object> context) {
- this.context = context;
- }
-
- @Override
- public Map<String, Object> getContext() {
- return context;
+ public NodeLostEvent(AutoScaling.EventType eventType, String source, long nodeLostNanoTime, String nodeRemoved) {
+ super(eventType, source, nodeLostNanoTime, Collections.singletonMap(NODE_NAME, nodeRemoved));
}
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/24727071/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
index eca8c0b..0db38fb 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
@@ -74,7 +74,7 @@ public class OverseerTriggerThread implements Runnable, Closeable {
public OverseerTriggerThread(ZkController zkController) {
this.zkController = zkController;
zkStateReader = zkController.getZkStateReader();
- scheduledTriggers = new ScheduledTriggers();
+ scheduledTriggers = new ScheduledTriggers(zkController.getZkClient());
triggerFactory = new AutoScaling.TriggerFactory(zkController.getCoreContainer());
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/24727071/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
index 8dfbddf..9b80f09 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
@@ -26,6 +26,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
@@ -35,6 +36,10 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.solr.cloud.ActionThrottle;
+import org.apache.solr.cloud.DistributedQueue;
+import org.apache.solr.cloud.Overseer;
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.IOUtils;
import org.apache.solr.util.DefaultSolrThreadFactory;
@@ -50,7 +55,7 @@ public class ScheduledTriggers implements Closeable {
static final int DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS = 1;
static final int DEFAULT_MIN_MS_BETWEEN_ACTIONS = 5000;
- private final Map<String, ScheduledTrigger> scheduledTriggers = new HashMap<>();
+ private final Map<String, ScheduledTrigger> scheduledTriggers = new ConcurrentHashMap<>();
/**
* Thread pool for scheduling the triggers
@@ -70,7 +75,11 @@ public class ScheduledTriggers implements Closeable {
private final ActionThrottle actionThrottle;
- public ScheduledTriggers() {
+ private final SolrZkClient zkClient;
+
+ private final Overseer.Stats queueStats;
+
+ public ScheduledTriggers(SolrZkClient zkClient) {
// todo make the core pool size configurable
// it is important to use more than one because a time taking trigger can starve other scheduled triggers
// ideally we should have as many core threads as the number of triggers but firstly, we don't know beforehand
@@ -83,6 +92,8 @@ public class ScheduledTriggers implements Closeable {
actionExecutor = ExecutorUtil.newMDCAwareSingleThreadExecutor(new DefaultSolrThreadFactory("AutoscalingActionExecutor"));
// todo make the wait time configurable
actionThrottle = new ActionThrottle("action", DEFAULT_MIN_MS_BETWEEN_ACTIONS);
+ this.zkClient = zkClient;
+ queueStats = new Overseer.Stats();
}
/**
@@ -97,7 +108,7 @@ public class ScheduledTriggers implements Closeable {
if (isClosed) {
throw new AlreadyClosedException("ScheduledTriggers has been closed and cannot be used anymore");
}
- ScheduledTrigger scheduledTrigger = new ScheduledTrigger(newTrigger);
+ ScheduledTrigger scheduledTrigger = new ScheduledTrigger(newTrigger, zkClient, queueStats);
ScheduledTrigger old = scheduledTriggers.putIfAbsent(newTrigger.getName(), scheduledTrigger);
if (old != null) {
if (old.trigger.equals(newTrigger)) {
@@ -109,38 +120,55 @@ public class ScheduledTriggers implements Closeable {
scheduledTriggers.replace(newTrigger.getName(), scheduledTrigger);
}
newTrigger.setListener(event -> {
- AutoScaling.Trigger source = event.getSource();
+ ScheduledTrigger scheduledSource = scheduledTriggers.get(event.getSource());
+ if (scheduledSource == null) {
+ log.warn("Ignoring autoscaling event " + event + " because the source trigger: " + event.getSource() + " doesn't exist.");
+ // XXX not sure what to return here...
+ return true;
+ }
+ AutoScaling.Trigger source = scheduledSource.trigger;
if (source.isClosed()) {
- log.warn("Ignoring autoscaling event because the source trigger: " + source + " has already been closed");
+ log.warn("Ignoring autoscaling event " + event + " because the source trigger: " + source + " has already been closed");
// we do not want to lose this event just because the trigger were closed, perhaps a replacement will need it
return false;
}
- if (hasPendingActions.compareAndSet(false, true)) {
- List<TriggerAction> actions = source.getActions();
- if (actions != null) {
- actionExecutor.submit(() -> {
- assert hasPendingActions.get();
- try {
- // let the action executor thread wait instead of the trigger thread so we use the throttle here
- actionThrottle.minimumWaitBetweenActions();
- actionThrottle.markAttemptingAction();
- for (TriggerAction action : actions) {
- try {
- action.process(event);
- } catch (Exception e) {
- log.error("Error executing action: " + action.getName() + " for trigger event: " + event, e);
- throw e;
+ boolean replaying = event.getProperty(TriggerEventBase.REPLAYING) != null ? (Boolean)event.getProperty(TriggerEventBase.REPLAYING) : false;
+ boolean enqueued = false;
+ try {
+ if (!replaying) {
+ enqueued = scheduledTrigger.enqueue(event);
+ }
+ if (hasPendingActions.compareAndSet(false, true)) {
+ List<TriggerAction> actions = source.getActions();
+ if (actions != null) {
+ actionExecutor.submit(() -> {
+ assert hasPendingActions.get();
+ try {
+ // let the action executor thread wait instead of the trigger thread so we use the throttle here
+ actionThrottle.minimumWaitBetweenActions();
+ actionThrottle.markAttemptingAction();
+ for (TriggerAction action : actions) {
+ try {
+ action.process(event);
+ } catch (Exception e) {
+ log.error("Error executing action: " + action.getName() + " for trigger event: " + event, e);
+ throw e;
+ }
}
+ } finally {
+ hasPendingActions.set(false);
}
- } finally {
- hasPendingActions.set(false);
- }
- });
+ });
+ }
+ return true;
+ } else {
+ // there is an action in the queue and we don't want to enqueue another until it is complete
+ return false;
+ }
+ } finally {
+ if (enqueued) {
+ scheduledTrigger.dequeue();
}
- return true;
- } else {
- // there is an action in the queue and we don't want to enqueue another until it is complete
- return false;
}
});
scheduledTrigger.scheduledFuture = scheduledThreadPoolExecutor.scheduleWithFixedDelay(scheduledTrigger, 0, DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS, TimeUnit.SECONDS);
@@ -150,12 +178,8 @@ public class ScheduledTriggers implements Closeable {
* Removes and stops the trigger with the given name
*
* @param triggerName the name of the trigger to be removed
- * @throws AlreadyClosedException if this class has already been closed
*/
public synchronized void remove(String triggerName) {
- if (isClosed) {
- throw new AlreadyClosedException("ScheduledTriggers has been closed and cannot be used any more");
- }
ScheduledTrigger removed = scheduledTriggers.remove(triggerName);
IOUtils.closeQuietly(removed);
}
@@ -184,13 +208,28 @@ public class ScheduledTriggers implements Closeable {
private class ScheduledTrigger implements Runnable, Closeable {
AutoScaling.Trigger trigger;
ScheduledFuture<?> scheduledFuture;
+ TriggerEventQueue queue;
- ScheduledTrigger(AutoScaling.Trigger trigger) {
+ ScheduledTrigger(AutoScaling.Trigger trigger, SolrZkClient zkClient, Overseer.Stats stats) {
this.trigger = trigger;
+ this.queue = new TriggerEventQueue(zkClient, trigger.getName(), stats);
+ }
+
+ public boolean enqueue(AutoScaling.TriggerEvent event) {
+ return queue.offerEvent(event);
+ }
+
+ public AutoScaling.TriggerEvent dequeue() {
+ return queue.pollEvent();
}
@Override
public void run() {
+ // replay accumulated events first, if any
+ AutoScaling.TriggerEvent event;
+ while ((event = queue.pollEvent()) != null) {
+ trigger.getListener().triggerFired(event);
+ }
// fire a trigger only if an action is not pending
// note this is not fool proof e.g. it does not prevent an action being executed while a trigger
// is still executing. There is additional protection against that scenario in the event listener.
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/24727071/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEventBase.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEventBase.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEventBase.java
new file mode 100644
index 0000000..2290e40
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEventBase.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud.autoscaling;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Base class for event implementations.
+ */
+public abstract class TriggerEventBase implements AutoScaling.TriggerEvent {
+ public static final String REPLAYING = "replaying";
+
+ protected final String source;
+ protected final long eventNanoTime;
+ protected final AutoScaling.EventType eventType;
+ protected final Map<String, Object> properties = new HashMap<>();
+
+ protected TriggerEventBase(AutoScaling.EventType eventType, String source, long eventNanoTime,
+ Map<String, Object> properties) {
+ this.eventType = eventType;
+ this.source = source;
+ this.eventNanoTime = eventNanoTime;
+ if (properties != null) {
+ this.properties.putAll(properties);
+ }
+ }
+
+ @Override
+ public String getSource() {
+ return source;
+ }
+
+ @Override
+ public long getEventNanoTime() {
+ return eventNanoTime;
+ }
+
+ @Override
+ public Map<String, Object> getProperties() {
+ return properties;
+ }
+
+ @Override
+ public Object getProperty(String name) {
+ return properties.get(name);
+ }
+
+ @Override
+ public AutoScaling.EventType getEventType() {
+ return eventType;
+ }
+
+ @Override
+ public void setProperties(Map<String, Object> context) {
+ this.properties.clear();
+ if (context != null) {
+ this.properties.putAll(context);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return this.getClass().getSimpleName() + "{" +
+ "source='" + source + '\'' +
+ ", eventNanoTime=" + eventNanoTime +
+ ", properties=" + properties +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/24727071/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEventQueue.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEventQueue.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEventQueue.java
new file mode 100644
index 0000000..3bf79a6
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEventQueue.java
@@ -0,0 +1,80 @@
+package org.apache.solr.cloud.autoscaling;
+
+import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.solr.cloud.DistributedQueue;
+import org.apache.solr.cloud.Overseer;
+import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.Utils;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public class TriggerEventQueue extends DistributedQueue {
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ public static class ReplayingEvent extends TriggerEventBase {
+
+ public ReplayingEvent(AutoScaling.EventType eventType, String source, long eventNanoTime, Map<String, Object> properties) {
+ super(eventType, source, eventNanoTime, properties);
+ this.properties.put(REPLAYING, true);
+ }
+ }
+
+ private final String triggerName;
+
+ public TriggerEventQueue(SolrZkClient zookeeper, String triggerName, Overseer.Stats stats) {
+ super(zookeeper, ZkStateReader.SOLR_AUTOSCALING_EVENTS_PATH + "/" + triggerName, stats);
+ this.triggerName = triggerName;
+ }
+
+ public boolean offerEvent(AutoScaling.TriggerEvent event) {
+ try {
+ // yuck, serializing simple beans should be supported by Utils...
+ Map<String, Object> map = new HashMap<>();
+ map.put("eventType", event.getEventType().toString());
+ map.put("source", event.getSource());
+ map.put("eventNanoTime", event.getEventNanoTime());
+ map.put("properties", event.getProperties());
+ byte[] data = Utils.toJSON(map);
+ offer(data);
+ return true;
+ } catch (KeeperException | InterruptedException e) {
+ LOG.warn("Exception adding event " + event + " to queue " + triggerName, e);
+ return false;
+ }
+ }
+
+ public AutoScaling.TriggerEvent pollEvent() {
+ byte[] data;
+ try {
+ while ((data = poll()) != null) {
+ if (data.length == 0) {
+ LOG.warn("ignoring empty data...");
+ continue;
+ }
+ try {
+ Map<String, Object> map = (Map<String, Object>) Utils.fromJSON(data);
+ String source = (String)map.get("source");
+ long eventNanoTime = ((Number)map.get("eventNanoTime")).longValue();
+ AutoScaling.EventType eventType = AutoScaling.EventType.valueOf((String)map.get("eventType"));
+ Map<String, Object> properties = (Map<String, Object>)map.get("properties");
+ return new ReplayingEvent(eventType, source, eventNanoTime, properties);
+ } catch (Exception e) {
+ LOG.warn("Invalid event data, ignoring: " + new String(data));
+ continue;
+ }
+ }
+ } catch (KeeperException | InterruptedException e) {
+ LOG.warn("Exception polling queue of trigger " + triggerName, e);
+ }
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/24727071/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeAddedTriggerTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeAddedTriggerTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeAddedTriggerTest.java
index b0405cf..f916382 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeAddedTriggerTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeAddedTriggerTest.java
@@ -84,7 +84,7 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase {
NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = eventRef.get();
assertNotNull(nodeAddedEvent);
- assertEquals("", newNode.getNodeName(), nodeAddedEvent.getNodeName());
+ assertEquals("", newNode.getNodeName(), nodeAddedEvent.getProperty(NodeAddedTrigger.NodeAddedEvent.NODE_NAME));
}
// add a new node but remove it before the waitFor period expires
@@ -210,7 +210,7 @@ public class NodeAddedTriggerTest extends SolrCloudTestCase {
assertTrue(fired.get());
NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = eventRef.get();
assertNotNull(nodeAddedEvent);
- assertEquals("", newNode.getNodeName(), nodeAddedEvent.getNodeName());
+ assertEquals("", newNode.getNodeName(), nodeAddedEvent.getProperty(NodeAddedTrigger.NodeAddedEvent.NODE_NAME));
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/24727071/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeLostTriggerTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeLostTriggerTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeLostTriggerTest.java
index efa63d3..6d2e4a7 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeLostTriggerTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/NodeLostTriggerTest.java
@@ -85,7 +85,7 @@ public class NodeLostTriggerTest extends SolrCloudTestCase {
NodeLostTrigger.NodeLostEvent nodeLostEvent = eventRef.get();
assertNotNull(nodeLostEvent);
- assertEquals("", lostNodeName, nodeLostEvent.getNodeName());
+ assertEquals("", lostNodeName, nodeLostEvent.getProperty(NodeLostTrigger.NodeLostEvent.NODE_NAME));
}
@@ -235,7 +235,7 @@ public class NodeLostTriggerTest extends SolrCloudTestCase {
NodeLostTrigger.NodeLostEvent nodeLostEvent = eventRef.get();
assertNotNull(nodeLostEvent);
- assertEquals("", lostNodeName, nodeLostEvent.getNodeName());
+ assertEquals("", lostNodeName, nodeLostEvent.getProperty(NodeLostTrigger.NodeLostEvent.NODE_NAME));
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/24727071/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
index 545c0d6..34e8587 100644
--- a/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/autoscaling/TriggerIntegrationTest.java
@@ -202,16 +202,16 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
if (lastActionExecutedAt.get() != 0) {
log.info("last action at " + lastActionExecutedAt.get() + " nano time = " + System.nanoTime());
if (System.nanoTime() - lastActionExecutedAt.get() < TimeUnit.NANOSECONDS.convert(ScheduledTriggers.DEFAULT_MIN_MS_BETWEEN_ACTIONS - DELTA_MS, TimeUnit.MILLISECONDS)) {
- log.info("action executed again before minimum wait time from {}", event.getSource().getName());
+ log.info("action executed again before minimum wait time from {}", event.getSource());
fail("TriggerListener was fired before the throttling period");
}
}
if (onlyOnce.compareAndSet(false, true)) {
- log.info("action executed from {}", event.getSource().getName());
+ log.info("action executed from {}", event.getSource());
lastActionExecutedAt.set(System.nanoTime());
triggerFiredLatch.countDown();
} else {
- log.info("action executed more than once from {}", event.getSource().getName());
+ log.info("action executed more than once from {}", event.getSource());
fail("Trigger should not have fired more than once!");
}
} finally {
@@ -287,7 +287,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
NodeLostTrigger.NodeLostEvent nodeLostEvent = (NodeLostTrigger.NodeLostEvent) eventRef.get();
assertNotNull(nodeLostEvent);
assertEquals("The node added trigger was fired but for a different node",
- nodeName, nodeLostEvent.getNodeName());
+ nodeName, nodeLostEvent.getProperty(NodeLostTrigger.NodeLostEvent.NODE_NAME));
}
@Test
@@ -345,7 +345,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = (NodeAddedTrigger.NodeAddedEvent) eventRef.get();
assertNotNull(nodeAddedEvent);
assertEquals("The node added trigger was fired but for a different node",
- newNode.getNodeName(), nodeAddedEvent.getNodeName());
+ newNode.getNodeName(), nodeAddedEvent.getProperty(NodeAddedTrigger.NodeAddedEvent.NODE_NAME));
}
@Test
@@ -374,7 +374,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
NodeAddedTrigger.NodeAddedEvent nodeAddedEvent = (NodeAddedTrigger.NodeAddedEvent) eventRef.get();
assertNotNull(nodeAddedEvent);
assertEquals("The node added trigger was fired but for a different node",
- newNode.getNodeName(), nodeAddedEvent.getNodeName());
+ newNode.getNodeName(), nodeAddedEvent.getProperty(NodeAddedTrigger.NodeAddedEvent.NODE_NAME));
}
@Test
@@ -413,7 +413,7 @@ public class TriggerIntegrationTest extends SolrCloudTestCase {
NodeLostTrigger.NodeLostEvent nodeLostEvent = (NodeLostTrigger.NodeLostEvent) eventRef.get();
assertNotNull(nodeLostEvent);
assertEquals("The node lost trigger was fired but for a different node",
- lostNodeName, nodeLostEvent.getNodeName());
+ lostNodeName, nodeLostEvent.getProperty(NodeLostTrigger.NodeLostEvent.NODE_NAME));
}
public static class TestTriggerAction implements TriggerAction {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/24727071/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index 0bcd9ca..45995b8 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -92,6 +92,7 @@ public class ZkStateReader implements Closeable {
public static final String REJOIN_AT_HEAD_PROP = "rejoinAtHead";
public static final String SOLR_SECURITY_CONF_PATH = "/security.json";
public static final String SOLR_AUTOSCALING_CONF_PATH = "/autoscaling.json";
+ public static final String SOLR_AUTOSCALING_EVENTS_PATH = "/autoscaling/events";
public static final String REPLICATION_FACTOR = "replicationFactor";
public static final String MAX_SHARDS_PER_NODE = "maxShardsPerNode";