You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2020/08/19 14:52:05 UTC

[lucene-solr] branch jira/solr-14749 updated: SOLR-14749: Provide a POC implentation of ClusterEventProducer, support scheduled events.

This is an automated email from the ASF dual-hosted git repository.

ab pushed a commit to branch jira/solr-14749
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/jira/solr-14749 by this push:
     new 784456d  SOLR-14749: Provide a POC implentation of ClusterEventProducer, support scheduled events.
784456d is described below

commit 784456d7e19171c3a70738745bc4bd05d8da210f
Author: Andrzej Bialecki <ab...@apache.org>
AuthorDate: Wed Aug 19 16:51:11 2020 +0200

    SOLR-14749: Provide a POC implentation of ClusterEventProducer, support
    scheduled events.
---
 .../solr/cluster/events/ClusterEventListener.java  |   9 +-
 .../solr/cluster/events/ClusterEventProducer.java  |  42 +++-
 .../org/apache/solr/cluster/events/Schedule.java   |   5 +-
 .../cluster/events/ScheduledEventListener.java     |  13 +
 .../events/impl/ClusterEventProducerImpl.java      | 269 ++++++++++++++++++++-
 5 files changed, 313 insertions(+), 25 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cluster/events/ClusterEventListener.java b/solr/core/src/java/org/apache/solr/cluster/events/ClusterEventListener.java
index 43c6d12..adf2e2b 100644
--- a/solr/core/src/java/org/apache/solr/cluster/events/ClusterEventListener.java
+++ b/solr/core/src/java/org/apache/solr/cluster/events/ClusterEventListener.java
@@ -26,9 +26,16 @@ import java.util.Set;
  */
 public interface ClusterEventListener {
 
-  // reports types of events that this listener is interested in
+  /**
+   * The types of events that this listener can process.
+   */
   Set<ClusterEvent.EventType> getEventTypes();
 
+  /**
+   * Handle the event. Implementations should be non-blocking - if any long
+   * processing is needed it should be performed asynchronously.
+   * @param event cluster event
+   */
   void onEvent(ClusterEvent event);
 
 }
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/ClusterEventProducer.java b/solr/core/src/java/org/apache/solr/cluster/events/ClusterEventProducer.java
index 71cc25d..a46703d 100644
--- a/solr/core/src/java/org/apache/solr/cluster/events/ClusterEventProducer.java
+++ b/solr/core/src/java/org/apache/solr/cluster/events/ClusterEventProducer.java
@@ -16,29 +16,51 @@
  */
 package org.apache.solr.cluster.events;
 
-import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * Component that produces {@link ClusterEvent} instances.
  */
 public interface ClusterEventProducer {
 
-  // XXX should we have an option to register only for particular types of
-  // events (the producer would have to filter them per listener)?
-  // if not then each listener will get all event types and will have to
-  // filter itself, which is cumbersome
-  // See also ClusterEventListener.getEventTypes()
-  void registerListener(ClusterEventListener listener);
+  /**
+   * Returns a modifiable map of event types and listeners to process events
+   * of a given type.
+   */
+  Map<ClusterEvent.EventType, Set<ClusterEventListener>> getEventListeners();
 
-  void unregisterListener(ClusterEventListener listener);
+  /**
+   * Register an event listener. This listener will be notified about event
+   * of the types that it declares in {@link ClusterEventListener#getEventTypes()}
+   * @param listener non-null listener. If the same instance of the listener is
+   *                 already registered it will be ignored.
+   */
+  default void registerListener(ClusterEventListener listener) throws Exception {
+    listener.getEventTypes().forEach(type -> {
+      Set<ClusterEventListener> perType = getEventListeners().computeIfAbsent(type, t -> ConcurrentHashMap.newKeySet());
+      perType.add(listener);
+    });
+  }
 
-  Map<ClusterEvent.EventType, Set<ClusterEventListener>> getEventListeners();
+  /**
+   * Unregister an event listener.
+   * @param listener non-null listener.
+   */
+  default void unregisterListener(ClusterEventListener listener) {
+    listener.getEventTypes().forEach(type ->
+        getEventListeners().getOrDefault(type, Collections.emptySet()).remove(listener)
+    );
+  }
 
+  /**
+   * Fire an event. This method will call registered listeners that subscribed to the
+   * type of event being passed.
+   * @param event cluster event
+   */
   default void fireEvent(ClusterEvent event) {
-    // XXX filter here by acceptable event types per listener?
     getEventListeners().getOrDefault(event.getType(), Collections.emptySet())
         .forEach(listener -> listener.onEvent(event));
   }
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/Schedule.java b/solr/core/src/java/org/apache/solr/cluster/events/Schedule.java
index 2205719..d9afc5e 100644
--- a/solr/core/src/java/org/apache/solr/cluster/events/Schedule.java
+++ b/solr/core/src/java/org/apache/solr/cluster/events/Schedule.java
@@ -23,9 +23,12 @@ public interface Schedule {
 
   String getName();
 
-  // absolute or date math expr?
+  // date math expr
   String getStartTime();
 
+  // may be null only if getStartTime contained TZ
+  String getTimeZone();
+
   // date math expr
   String getInterval();
 }
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/ScheduledEventListener.java b/solr/core/src/java/org/apache/solr/cluster/events/ScheduledEventListener.java
new file mode 100644
index 0000000..2537364
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cluster/events/ScheduledEventListener.java
@@ -0,0 +1,13 @@
+package org.apache.solr.cluster.events;
+
+/**
+ *
+ */
+public interface ScheduledEventListener extends ClusterEventListener {
+
+  /**
+   * Return the schedule that this listener needs. This is used when registering the
+   * listener to properly configure the scheduler.
+   */
+  Schedule getSchedule();
+}
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/impl/ClusterEventProducerImpl.java b/solr/core/src/java/org/apache/solr/cluster/events/impl/ClusterEventProducerImpl.java
index 485340e..c9fbd64 100644
--- a/solr/core/src/java/org/apache/solr/cluster/events/impl/ClusterEventProducerImpl.java
+++ b/solr/core/src/java/org/apache/solr/cluster/events/impl/ClusterEventProducerImpl.java
@@ -16,54 +16,297 @@
  */
 package org.apache.solr.cluster.events.impl;
 
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.text.ParseException;
+import java.time.Instant;
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeFormatterBuilder;
+import java.time.temporal.ChronoField;
 import java.util.Collections;
+import java.util.Date;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
+import java.util.TimeZone;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.solr.cloud.ZkController;
 import org.apache.solr.cluster.events.ClusterEvent;
 import org.apache.solr.cluster.events.ClusterEventListener;
 import org.apache.solr.cluster.events.ClusterEventProducer;
+import org.apache.solr.cluster.events.NodeDownEvent;
+import org.apache.solr.cluster.events.NodeUpEvent;
+import org.apache.solr.cluster.events.Schedule;
+import org.apache.solr.cluster.events.ScheduledEvent;
+import org.apache.solr.cluster.events.ScheduledEventListener;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.LiveNodesListener;
+import org.apache.solr.common.util.SolrNamedThreadFactory;
 import org.apache.solr.core.CoreContainer;
+import org.apache.solr.util.DateMathParser;
+import org.apache.solr.util.TimeZoneUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
- *
+ * Implementation of {@link ClusterEventProducer}.
+ * <h3>Implementation notes</h3>
+ * <p>For each cluster event relevant listeners are always invoked sequentially
+ * (not in parallel) and in arbitrary order. This means that if any listener blocks the
+ * processing other listeners may be invoked much later or not at all.</p>
+ * <p>Scheduled events are triggered at most with {@link #SCHEDULE_INTERVAL_SEC} interval. See also above note
+ * on the sequential processing. If the total time of execution exceeds any schedule
+ * interval such events will be silently missed and they will be invoked only when the
+ * the next event will be generated.</p>
  */
-public class ClusterEventProducerImpl implements ClusterEventProducer {
+public class ClusterEventProducerImpl implements ClusterEventProducer, Closeable {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  public static final int SCHEDULE_INTERVAL_SEC = 10;
 
   private final Map<ClusterEvent.EventType, Set<ClusterEventListener>> listeners = new HashMap<>();
+  private final Map<String, CompiledSchedule> schedules = new ConcurrentHashMap<>();
   private final CoreContainer cc;
+  private final LiveNodesListener liveNodesListener;
+  private final ZkController zkController;
+  private final ScheduledExecutorService scheduler;
+
+  private final Set<ClusterEvent.EventType> supportedEvents =
+      new HashSet<>() {{
+        add(ClusterEvent.EventType.NODE_DOWN);
+        add(ClusterEvent.EventType.NODE_UP);
+        add(ClusterEvent.EventType.SCHEDULED);
+      }};
+
+  private volatile boolean isClosed = false;
+
+  private class CompiledSchedule {
+    final String name;
+    final TimeZone timeZone;
+    final Instant startTime;
+    final String interval;
+    final DateMathParser dateMathParser;
+
+    Instant lastRunAt;
+
+    CompiledSchedule(Schedule schedule) throws Exception {
+      this.name = schedule.getName();
+      this.timeZone = TimeZoneUtils.getTimeZone(schedule.getTimeZone());
+      this.startTime = parseStartTime(new Date(), schedule.getStartTime(), timeZone);
+      this.lastRunAt = startTime;
+      this.interval = schedule.getInterval();
+      this.dateMathParser = new DateMathParser(timeZone);
+    }
+
+    private Instant parseStartTime(Date now, String startTimeStr, TimeZone timeZone) throws Exception {
+      try {
+        // try parsing startTime as an ISO-8601 date time string
+        return DateMathParser.parseMath(now, startTimeStr).toInstant();
+      } catch (SolrException e) {
+        if (e.code() != SolrException.ErrorCode.BAD_REQUEST.code) {
+          throw new Exception("startTime: error parsing value '" + startTimeStr + "': " + e.toString());
+        }
+      }
+      DateTimeFormatter dateTimeFormatter = new DateTimeFormatterBuilder()
+          .append(DateTimeFormatter.ISO_LOCAL_DATE).appendPattern("['T'[HH[:mm[:ss]]]]")
+          .parseDefaulting(ChronoField.HOUR_OF_DAY, 0)
+          .parseDefaulting(ChronoField.MINUTE_OF_HOUR, 0)
+          .parseDefaulting(ChronoField.SECOND_OF_MINUTE, 0)
+          .toFormatter(Locale.ROOT).withZone(timeZone.toZoneId());
+      try {
+        return Instant.from(dateTimeFormatter.parse(startTimeStr));
+      } catch (Exception e) {
+        throw new Exception("startTime: error parsing startTime '" + startTimeStr + "': " + e.toString());
+      }
+    }
+
+    boolean shouldRun() {
+      dateMathParser.setNow(new Date(lastRunAt.toEpochMilli()));
+      Instant nextRunTime;
+      try {
+        Date next = dateMathParser.parseMath(interval);
+        nextRunTime = next.toInstant();
+      } catch (ParseException e) {
+        log.warn("Invalid math expression, skipping: " + e);
+        return false;
+      }
+      if (Instant.now().isAfter(nextRunTime)) {
+        return true;
+      } else {
+        return false;
+      }
+    }
+
+    void setLastRunAt(Instant lastRunAt) {
+      this.lastRunAt = lastRunAt;
+    }
+  }
 
   public ClusterEventProducerImpl(CoreContainer coreContainer) {
     this.cc = coreContainer;
-    ZkController zkController = this.cc.getZkController();
+    this.zkController = this.cc.getZkController();
     if (zkController == null) {
+      liveNodesListener = null;
+      scheduler = null;
       return;
     }
-    // XXX register liveNodesListener
 
-    // XXX register collection state listener
+    // register liveNodesListener
+    liveNodesListener = (oldNodes, newNodes) -> {
+      // already closed but still registered
+      if (isClosed) {
+        // remove the listener
+        return true;
+      }
+      // spurious event, ignore but keep listening
+      if (oldNodes.equals(newNodes)) {
+        return false;
+      }
+      oldNodes.forEach(oldNode -> {
+        if (!newNodes.contains(oldNode)) {
+          fireEvent(new NodeDownEvent() {
+            final Instant timestamp = Instant.now();
+            @Override
+            public Instant getTimestamp() {
+              return timestamp;
+            }
+
+            @Override
+            public String getNodeName() {
+              return oldNode;
+            }
+          });
+        }
+      });
+      newNodes.forEach(newNode -> {
+        if (!oldNodes.contains(newNode)) {
+          fireEvent(new NodeUpEvent() {
+            final Instant timestamp = Instant.now();
+            @Override
+            public Instant getTimestamp() {
+              return timestamp;
+            }
+            @Override
+            public String getNodeName() {
+              return newNode;
+            }
+          });
+        }
+      });
+      return false;
+    };
+
+    // XXX register collection state listener?
+    // XXX not sure how to efficiently monitor for REPLICA_DOWN events
+
+    // create scheduler
+    scheduler = Executors.newSingleThreadScheduledExecutor(new SolrNamedThreadFactory("cluster-event-scheduler"));
+    scheduler.schedule(() -> maybeFireScheduledEvent(), SCHEDULE_INTERVAL_SEC, TimeUnit.SECONDS);
   }
 
-  @Override
-  public void registerListener(ClusterEventListener listener) {
-    listener.getEventTypes().forEach(type -> {
-      Set<ClusterEventListener> perType = listeners.computeIfAbsent(type, t -> ConcurrentHashMap.newKeySet());
-      perType.add(listener);
+  private void ensureNotClosed() {
+    if (isClosed) {
+      throw new RuntimeException("ClusterEventProducerImpl already closed");
+    }
+  }
+
+  private void maybeFireScheduledEvent() {
+    ensureNotClosed();
+    Set<ClusterEventListener> scheduledListeners = listeners.getOrDefault(ClusterEvent.EventType.SCHEDULED, Collections.emptySet());
+    if (scheduledListeners.isEmpty()) {
+      return;
+    }
+    scheduledListeners.forEach(listener -> {
+      ScheduledEventListener scheduledEventListener = (ScheduledEventListener) listener;
+      Schedule schedule = scheduledEventListener.getSchedule();
+      CompiledSchedule compiledSchedule = schedules.get(schedule.getName());
+      if (compiledSchedule == null) { // ???
+        return;
+      }
+      if (compiledSchedule.shouldRun()) {
+        Instant now = Instant.now();
+        ClusterEvent event = new ScheduledEvent() {
+          @Override
+          public Schedule getSchedule() {
+            return schedule;
+          }
+
+          @Override
+          public Instant getTimestamp() {
+            return now;
+          }
+        };
+        listener.onEvent(event);
+        compiledSchedule.setLastRunAt(now);
+      }
     });
   }
 
+
+  @Override
+  public void registerListener(ClusterEventListener listener) throws Exception {
+    ensureNotClosed();
+    try {
+      listener.getEventTypes().forEach(type -> {
+        if (!supportedEvents.contains(type)) {
+          throw new RuntimeException("event type " + type + " not supported yet");
+        }
+      });
+    } catch (Throwable e) {
+      throw new Exception(e);
+    }
+    CompiledSchedule compiledSchedule = null;
+    if (listener.getEventTypes().contains(ClusterEvent.EventType.SCHEDULED)) {
+      if (!(listener instanceof ScheduledEventListener)) {
+        throw new Exception("listener " + listener + " wants to process " +
+            ClusterEvent.EventType.SCHEDULED + " events but is not a " +
+            ScheduledEventListener.class.getSimpleName());
+      } else {
+        compiledSchedule = new CompiledSchedule(((ScheduledEventListener) listener).getSchedule());
+      }
+    }
+    ClusterEventProducer.super.registerListener(listener);
+    if (compiledSchedule != null) {
+      schedules.put(compiledSchedule.name, compiledSchedule);
+    }
+  }
+
   @Override
   public void unregisterListener(ClusterEventListener listener) {
-    listener.getEventTypes().forEach(type ->
-      listeners.getOrDefault(type, Collections.emptySet()).remove(listener)
-    );
+    if (listener.getEventTypes().contains(ClusterEvent.EventType.SCHEDULED)) {
+      schedules.remove(((ScheduledEventListener) listener).getSchedule().getName());
+    }
+    ClusterEventProducer.super.unregisterListener(listener);
+  }
+
+  @Override
+  public void close() throws IOException {
+    isClosed = true;
+    if (liveNodesListener != null) {
+      zkController.zkStateReader.removeLiveNodesListener(liveNodesListener);
+    }
+    if (scheduler != null) {
+      scheduler.shutdownNow();
+      try {
+        scheduler.awaitTermination(60, TimeUnit.SECONDS);
+      } catch (InterruptedException e) {
+        log.warn("Interrupted while waiting for the scheduler to shut down, ignoring");
+      }
+    }
+    schedules.clear();
+    listeners.clear();
   }
 
   @Override
   public Map<ClusterEvent.EventType, Set<ClusterEventListener>> getEventListeners() {
+    ensureNotClosed();
     return listeners;
   }
 }