You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2020/08/19 14:52:05 UTC
[lucene-solr] branch jira/solr-14749 updated: SOLR-14749: Provide a
POC implentation of ClusterEventProducer, support scheduled events.
This is an automated email from the ASF dual-hosted git repository.
ab pushed a commit to branch jira/solr-14749
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/jira/solr-14749 by this push:
new 784456d SOLR-14749: Provide a POC implentation of ClusterEventProducer, support scheduled events.
784456d is described below
commit 784456d7e19171c3a70738745bc4bd05d8da210f
Author: Andrzej Bialecki <ab...@apache.org>
AuthorDate: Wed Aug 19 16:51:11 2020 +0200
SOLR-14749: Provide a POC implentation of ClusterEventProducer, support
scheduled events.
---
.../solr/cluster/events/ClusterEventListener.java | 9 +-
.../solr/cluster/events/ClusterEventProducer.java | 42 +++-
.../org/apache/solr/cluster/events/Schedule.java | 5 +-
.../cluster/events/ScheduledEventListener.java | 13 +
.../events/impl/ClusterEventProducerImpl.java | 269 ++++++++++++++++++++-
5 files changed, 313 insertions(+), 25 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/ClusterEventListener.java b/solr/core/src/java/org/apache/solr/cluster/events/ClusterEventListener.java
index 43c6d12..adf2e2b 100644
--- a/solr/core/src/java/org/apache/solr/cluster/events/ClusterEventListener.java
+++ b/solr/core/src/java/org/apache/solr/cluster/events/ClusterEventListener.java
@@ -26,9 +26,16 @@ import java.util.Set;
*/
public interface ClusterEventListener {
- // reports types of events that this listener is interested in
+ /**
+ * The types of events that this listener can process.
+ */
Set<ClusterEvent.EventType> getEventTypes();
+ /**
+ * Handle the event. Implementations should be non-blocking - if any long
+ * processing is needed it should be performed asynchronously.
+ * @param event cluster event
+ */
void onEvent(ClusterEvent event);
}
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/ClusterEventProducer.java b/solr/core/src/java/org/apache/solr/cluster/events/ClusterEventProducer.java
index 71cc25d..a46703d 100644
--- a/solr/core/src/java/org/apache/solr/cluster/events/ClusterEventProducer.java
+++ b/solr/core/src/java/org/apache/solr/cluster/events/ClusterEventProducer.java
@@ -16,29 +16,51 @@
*/
package org.apache.solr.cluster.events;
-import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
/**
* Component that produces {@link ClusterEvent} instances.
*/
public interface ClusterEventProducer {
- // XXX should we have an option to register only for particular types of
- // events (the producer would have to filter them per listener)?
- // if not then each listener will get all event types and will have to
- // filter itself, which is cumbersome
- // See also ClusterEventListener.getEventTypes()
- void registerListener(ClusterEventListener listener);
+ /**
+ * Returns a modifiable map of event types and listeners to process events
+ * of a given type.
+ */
+ Map<ClusterEvent.EventType, Set<ClusterEventListener>> getEventListeners();
- void unregisterListener(ClusterEventListener listener);
+ /**
+ * Register an event listener. This listener will be notified about event
+ * of the types that it declares in {@link ClusterEventListener#getEventTypes()}
+ * @param listener non-null listener. If the same instance of the listener is
+ * already registered it will be ignored.
+ */
+ default void registerListener(ClusterEventListener listener) throws Exception {
+ listener.getEventTypes().forEach(type -> {
+ Set<ClusterEventListener> perType = getEventListeners().computeIfAbsent(type, t -> ConcurrentHashMap.newKeySet());
+ perType.add(listener);
+ });
+ }
- Map<ClusterEvent.EventType, Set<ClusterEventListener>> getEventListeners();
+ /**
+ * Unregister an event listener.
+ * @param listener non-null listener.
+ */
+ default void unregisterListener(ClusterEventListener listener) {
+ listener.getEventTypes().forEach(type ->
+ getEventListeners().getOrDefault(type, Collections.emptySet()).remove(listener)
+ );
+ }
+ /**
+ * Fire an event. This method will call registered listeners that subscribed to the
+ * type of event being passed.
+ * @param event cluster event
+ */
default void fireEvent(ClusterEvent event) {
- // XXX filter here by acceptable event types per listener?
getEventListeners().getOrDefault(event.getType(), Collections.emptySet())
.forEach(listener -> listener.onEvent(event));
}
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/Schedule.java b/solr/core/src/java/org/apache/solr/cluster/events/Schedule.java
index 2205719..d9afc5e 100644
--- a/solr/core/src/java/org/apache/solr/cluster/events/Schedule.java
+++ b/solr/core/src/java/org/apache/solr/cluster/events/Schedule.java
@@ -23,9 +23,12 @@ public interface Schedule {
String getName();
- // absolute or date math expr?
+ // date math expr
String getStartTime();
+ // may be null only if getStartTime contained TZ
+ String getTimeZone();
+
// date math expr
String getInterval();
}
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/ScheduledEventListener.java b/solr/core/src/java/org/apache/solr/cluster/events/ScheduledEventListener.java
new file mode 100644
index 0000000..2537364
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cluster/events/ScheduledEventListener.java
@@ -0,0 +1,13 @@
+package org.apache.solr.cluster.events;
+
+/**
+ *
+ */
+public interface ScheduledEventListener extends ClusterEventListener {
+
+ /**
+ * Return the schedule that this listener needs. This is used when registering the
+ * listener to properly configure the scheduler.
+ */
+ Schedule getSchedule();
+}
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/impl/ClusterEventProducerImpl.java b/solr/core/src/java/org/apache/solr/cluster/events/impl/ClusterEventProducerImpl.java
index 485340e..c9fbd64 100644
--- a/solr/core/src/java/org/apache/solr/cluster/events/impl/ClusterEventProducerImpl.java
+++ b/solr/core/src/java/org/apache/solr/cluster/events/impl/ClusterEventProducerImpl.java
@@ -16,54 +16,297 @@
*/
package org.apache.solr.cluster.events.impl;
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.text.ParseException;
+import java.time.Instant;
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeFormatterBuilder;
+import java.time.temporal.ChronoField;
import java.util.Collections;
+import java.util.Date;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Locale;
import java.util.Map;
import java.util.Set;
+import java.util.TimeZone;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.cluster.events.ClusterEvent;
import org.apache.solr.cluster.events.ClusterEventListener;
import org.apache.solr.cluster.events.ClusterEventProducer;
+import org.apache.solr.cluster.events.NodeDownEvent;
+import org.apache.solr.cluster.events.NodeUpEvent;
+import org.apache.solr.cluster.events.Schedule;
+import org.apache.solr.cluster.events.ScheduledEvent;
+import org.apache.solr.cluster.events.ScheduledEventListener;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.LiveNodesListener;
+import org.apache.solr.common.util.SolrNamedThreadFactory;
import org.apache.solr.core.CoreContainer;
+import org.apache.solr.util.DateMathParser;
+import org.apache.solr.util.TimeZoneUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- *
+ * Implementation of {@link ClusterEventProducer}.
+ * <h3>Implementation notes</h3>
+ * <p>For each cluster event relevant listeners are always invoked sequentially
+ * (not in parallel) and in arbitrary order. This means that if any listener blocks the
+ * processing other listeners may be invoked much later or not at all.</p>
+ * <p>Scheduled events are triggered at most with {@link #SCHEDULE_INTERVAL_SEC} interval. See also above note
+ * on the sequential processing. If the total time of execution exceeds any schedule
+ * interval such events will be silently missed and they will be invoked only when the
+ * the next event will be generated.</p>
*/
-public class ClusterEventProducerImpl implements ClusterEventProducer {
+public class ClusterEventProducerImpl implements ClusterEventProducer, Closeable {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ public static final int SCHEDULE_INTERVAL_SEC = 10;
private final Map<ClusterEvent.EventType, Set<ClusterEventListener>> listeners = new HashMap<>();
+ private final Map<String, CompiledSchedule> schedules = new ConcurrentHashMap<>();
private final CoreContainer cc;
+ private final LiveNodesListener liveNodesListener;
+ private final ZkController zkController;
+ private final ScheduledExecutorService scheduler;
+
+ private final Set<ClusterEvent.EventType> supportedEvents =
+ new HashSet<>() {{
+ add(ClusterEvent.EventType.NODE_DOWN);
+ add(ClusterEvent.EventType.NODE_UP);
+ add(ClusterEvent.EventType.SCHEDULED);
+ }};
+
+ private volatile boolean isClosed = false;
+
+ private class CompiledSchedule {
+ final String name;
+ final TimeZone timeZone;
+ final Instant startTime;
+ final String interval;
+ final DateMathParser dateMathParser;
+
+ Instant lastRunAt;
+
+ CompiledSchedule(Schedule schedule) throws Exception {
+ this.name = schedule.getName();
+ this.timeZone = TimeZoneUtils.getTimeZone(schedule.getTimeZone());
+ this.startTime = parseStartTime(new Date(), schedule.getStartTime(), timeZone);
+ this.lastRunAt = startTime;
+ this.interval = schedule.getInterval();
+ this.dateMathParser = new DateMathParser(timeZone);
+ }
+
+ private Instant parseStartTime(Date now, String startTimeStr, TimeZone timeZone) throws Exception {
+ try {
+ // try parsing startTime as an ISO-8601 date time string
+ return DateMathParser.parseMath(now, startTimeStr).toInstant();
+ } catch (SolrException e) {
+ if (e.code() != SolrException.ErrorCode.BAD_REQUEST.code) {
+ throw new Exception("startTime: error parsing value '" + startTimeStr + "': " + e.toString());
+ }
+ }
+ DateTimeFormatter dateTimeFormatter = new DateTimeFormatterBuilder()
+ .append(DateTimeFormatter.ISO_LOCAL_DATE).appendPattern("['T'[HH[:mm[:ss]]]]")
+ .parseDefaulting(ChronoField.HOUR_OF_DAY, 0)
+ .parseDefaulting(ChronoField.MINUTE_OF_HOUR, 0)
+ .parseDefaulting(ChronoField.SECOND_OF_MINUTE, 0)
+ .toFormatter(Locale.ROOT).withZone(timeZone.toZoneId());
+ try {
+ return Instant.from(dateTimeFormatter.parse(startTimeStr));
+ } catch (Exception e) {
+ throw new Exception("startTime: error parsing startTime '" + startTimeStr + "': " + e.toString());
+ }
+ }
+
+ boolean shouldRun() {
+ dateMathParser.setNow(new Date(lastRunAt.toEpochMilli()));
+ Instant nextRunTime;
+ try {
+ Date next = dateMathParser.parseMath(interval);
+ nextRunTime = next.toInstant();
+ } catch (ParseException e) {
+ log.warn("Invalid math expression, skipping: " + e);
+ return false;
+ }
+ if (Instant.now().isAfter(nextRunTime)) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ void setLastRunAt(Instant lastRunAt) {
+ this.lastRunAt = lastRunAt;
+ }
+ }
public ClusterEventProducerImpl(CoreContainer coreContainer) {
this.cc = coreContainer;
- ZkController zkController = this.cc.getZkController();
+ this.zkController = this.cc.getZkController();
if (zkController == null) {
+ liveNodesListener = null;
+ scheduler = null;
return;
}
- // XXX register liveNodesListener
- // XXX register collection state listener
+ // register liveNodesListener
+ liveNodesListener = (oldNodes, newNodes) -> {
+ // already closed but still registered
+ if (isClosed) {
+ // remove the listener
+ return true;
+ }
+ // spurious event, ignore but keep listening
+ if (oldNodes.equals(newNodes)) {
+ return false;
+ }
+ oldNodes.forEach(oldNode -> {
+ if (!newNodes.contains(oldNode)) {
+ fireEvent(new NodeDownEvent() {
+ final Instant timestamp = Instant.now();
+ @Override
+ public Instant getTimestamp() {
+ return timestamp;
+ }
+
+ @Override
+ public String getNodeName() {
+ return oldNode;
+ }
+ });
+ }
+ });
+ newNodes.forEach(newNode -> {
+ if (!oldNodes.contains(newNode)) {
+ fireEvent(new NodeUpEvent() {
+ final Instant timestamp = Instant.now();
+ @Override
+ public Instant getTimestamp() {
+ return timestamp;
+ }
+ @Override
+ public String getNodeName() {
+ return newNode;
+ }
+ });
+ }
+ });
+ return false;
+ };
+
+ // XXX register collection state listener?
+ // XXX not sure how to efficiently monitor for REPLICA_DOWN events
+
+ // create scheduler
+ scheduler = Executors.newSingleThreadScheduledExecutor(new SolrNamedThreadFactory("cluster-event-scheduler"));
+ scheduler.schedule(() -> maybeFireScheduledEvent(), SCHEDULE_INTERVAL_SEC, TimeUnit.SECONDS);
}
- @Override
- public void registerListener(ClusterEventListener listener) {
- listener.getEventTypes().forEach(type -> {
- Set<ClusterEventListener> perType = listeners.computeIfAbsent(type, t -> ConcurrentHashMap.newKeySet());
- perType.add(listener);
+ private void ensureNotClosed() {
+ if (isClosed) {
+ throw new RuntimeException("ClusterEventProducerImpl already closed");
+ }
+ }
+
+ private void maybeFireScheduledEvent() {
+ ensureNotClosed();
+ Set<ClusterEventListener> scheduledListeners = listeners.getOrDefault(ClusterEvent.EventType.SCHEDULED, Collections.emptySet());
+ if (scheduledListeners.isEmpty()) {
+ return;
+ }
+ scheduledListeners.forEach(listener -> {
+ ScheduledEventListener scheduledEventListener = (ScheduledEventListener) listener;
+ Schedule schedule = scheduledEventListener.getSchedule();
+ CompiledSchedule compiledSchedule = schedules.get(schedule.getName());
+ if (compiledSchedule == null) { // ???
+ return;
+ }
+ if (compiledSchedule.shouldRun()) {
+ Instant now = Instant.now();
+ ClusterEvent event = new ScheduledEvent() {
+ @Override
+ public Schedule getSchedule() {
+ return schedule;
+ }
+
+ @Override
+ public Instant getTimestamp() {
+ return now;
+ }
+ };
+ listener.onEvent(event);
+ compiledSchedule.setLastRunAt(now);
+ }
});
}
+
+ @Override
+ public void registerListener(ClusterEventListener listener) throws Exception {
+ ensureNotClosed();
+ try {
+ listener.getEventTypes().forEach(type -> {
+ if (!supportedEvents.contains(type)) {
+ throw new RuntimeException("event type " + type + " not supported yet");
+ }
+ });
+ } catch (Throwable e) {
+ throw new Exception(e);
+ }
+ CompiledSchedule compiledSchedule = null;
+ if (listener.getEventTypes().contains(ClusterEvent.EventType.SCHEDULED)) {
+ if (!(listener instanceof ScheduledEventListener)) {
+ throw new Exception("listener " + listener + " wants to process " +
+ ClusterEvent.EventType.SCHEDULED + " events but is not a " +
+ ScheduledEventListener.class.getSimpleName());
+ } else {
+ compiledSchedule = new CompiledSchedule(((ScheduledEventListener) listener).getSchedule());
+ }
+ }
+ ClusterEventProducer.super.registerListener(listener);
+ if (compiledSchedule != null) {
+ schedules.put(compiledSchedule.name, compiledSchedule);
+ }
+ }
+
@Override
public void unregisterListener(ClusterEventListener listener) {
- listener.getEventTypes().forEach(type ->
- listeners.getOrDefault(type, Collections.emptySet()).remove(listener)
- );
+ if (listener.getEventTypes().contains(ClusterEvent.EventType.SCHEDULED)) {
+ schedules.remove(((ScheduledEventListener) listener).getSchedule().getName());
+ }
+ ClusterEventProducer.super.unregisterListener(listener);
+ }
+
+ @Override
+ public void close() throws IOException {
+ isClosed = true;
+ if (liveNodesListener != null) {
+ zkController.zkStateReader.removeLiveNodesListener(liveNodesListener);
+ }
+ if (scheduler != null) {
+ scheduler.shutdownNow();
+ try {
+ scheduler.awaitTermination(60, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ log.warn("Interrupted while waiting for the scheduler to shut down, ignoring");
+ }
+ }
+ schedules.clear();
+ listeners.clear();
}
@Override
public Map<ClusterEvent.EventType, Set<ClusterEventListener>> getEventListeners() {
+ ensureNotClosed();
return listeners;
}
}