You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by da...@apache.org on 2018/10/23 00:05:57 UTC

[36/52] [abbrv] [partial] lucene-solr:jira/gradle: Add gradle support for Solr

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0ae21ad0/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
deleted file mode 100644
index 052b4c4..0000000
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
+++ /dev/null
@@ -1,405 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.cloud.autoscaling;
-
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.net.ConnectException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.Set;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.lucene.store.AlreadyClosedException;
-import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
-import org.apache.solr.client.solrj.cloud.autoscaling.BadVersionException;
-import org.apache.solr.client.solrj.cloud.DistribStateManager;
-import org.apache.solr.client.solrj.cloud.SolrCloudManager;
-import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
-import org.apache.solr.common.SolrCloseable;
-import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.util.IOUtils;
-import org.apache.solr.common.util.Utils;
-import org.apache.solr.core.CloudConfig;
-import org.apache.solr.core.SolrResourceLoader;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.solr.common.cloud.ZkStateReader.SOLR_AUTOSCALING_CONF_PATH;
-
-/**
- * Overseer thread responsible for reading triggers from zookeeper and
- * adding/removing them from {@link ScheduledTriggers}
- */
-public class OverseerTriggerThread implements Runnable, SolrCloseable {
-
-  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
-  private final SolrCloudManager cloudManager;
-
-  private final CloudConfig cloudConfig;
-
-  private final ScheduledTriggers scheduledTriggers;
-
-  private final AutoScaling.TriggerFactory triggerFactory;
-
-  private final ReentrantLock updateLock = new ReentrantLock();
-
-  private final Condition updated = updateLock.newCondition();
-
-  /*
-  Following variables are only accessed or modified when updateLock is held
-   */
-  private int znodeVersion = -1;
-
-  private Map<String, AutoScaling.Trigger> activeTriggers = new HashMap<>();
-
-  private volatile boolean isClosed = false;
-
-  private AutoScalingConfig autoScalingConfig;
-
-  public OverseerTriggerThread(SolrResourceLoader loader, SolrCloudManager cloudManager, CloudConfig cloudConfig) {
-    this.cloudManager = cloudManager;
-    this.cloudConfig = cloudConfig;
-    scheduledTriggers = new ScheduledTriggers(loader, cloudManager);
-    triggerFactory = new AutoScaling.TriggerFactoryImpl(loader, cloudManager);
-  }
-
-  @Override
-  public void close() throws IOException {
-    updateLock.lock();
-    try {
-      isClosed = true;
-      activeTriggers.clear();
-      updated.signalAll();
-    } finally {
-      updateLock.unlock();
-    }
-    IOUtils.closeQuietly(triggerFactory);
-    IOUtils.closeQuietly(scheduledTriggers);
-    log.debug("OverseerTriggerThread has been closed explicitly");
-  }
-
-  /**
-   * For tests.
-   * @lucene.internal
-   * @return current {@link ScheduledTriggers} instance
-   */
-  public ScheduledTriggers getScheduledTriggers() {
-    return scheduledTriggers;
-  }
-
-  @Override
-  public boolean isClosed() {
-    return isClosed;
-  }
-
-  @Override
-  public void run() {
-    int lastZnodeVersion = znodeVersion;
-
-    // we automatically add a trigger for auto add replicas if it does not exists already
-    // we also automatically add a scheduled maintenance trigger
-    while (!isClosed)  {
-      try {
-        if (Thread.currentThread().isInterrupted()) {
-          log.warn("Interrupted");
-          break;
-        }
-        AutoScalingConfig autoScalingConfig = cloudManager.getDistribStateManager().getAutoScalingConfig();
-        AutoScalingConfig updatedConfig = withAutoAddReplicasTrigger(autoScalingConfig);
-        updatedConfig = withScheduledMaintenanceTrigger(updatedConfig);
-        if (updatedConfig.equals(autoScalingConfig)) break;
-        log.debug("Adding .auto_add_replicas and .scheduled_maintenance triggers");
-        cloudManager.getDistribStateManager().setData(SOLR_AUTOSCALING_CONF_PATH, Utils.toJSON(updatedConfig), updatedConfig.getZkVersion());
-        break;
-      } catch (BadVersionException bve) {
-        // somebody else has changed the configuration so we must retry
-      } catch (InterruptedException e) {
-        // Restore the interrupted status
-        Thread.currentThread().interrupt();
-        log.warn("Interrupted", e);
-        break;
-      }
-      catch (IOException | KeeperException e) {
-        if (e instanceof KeeperException.SessionExpiredException ||
-            (e.getCause()!=null && e.getCause() instanceof KeeperException.SessionExpiredException)) {
-          log.warn("Solr cannot talk to ZK, exiting " + 
-              getClass().getSimpleName() + " main queue loop", e);
-          return;
-        } else {
-          log.error("A ZK error has occurred", e);
-        }
-      }
-    }
-
-    if (isClosed || Thread.currentThread().isInterrupted())  return;
-
-    try {
-      refreshAutoScalingConf(new AutoScalingWatcher());
-    } catch (ConnectException e) {
-      log.warn("ZooKeeper watch triggered for autoscaling conf, but Solr cannot talk to ZK: [{}]", e.getMessage());
-    } catch (InterruptedException e) {
-      // Restore the interrupted status
-      Thread.currentThread().interrupt();
-      log.warn("Interrupted", e);
-    } catch (Exception e)  {
-      log.error("Unexpected exception", e);
-    }
-
-    while (true) {
-      Map<String, AutoScaling.Trigger> copy = null;
-      try {
-        // this can throw InterruptedException and we don't want to unlock if it did, so we keep this outside
-        // of the try/finally block
-        updateLock.lockInterruptibly();
-
-        // must check for close here before we await on the condition otherwise we can only be woken up on interruption
-        if (isClosed) {
-          log.warn("OverseerTriggerThread has been closed, exiting.");
-          break;
-        }
-
-        log.debug("Current znodeVersion {}, lastZnodeVersion {}", znodeVersion, lastZnodeVersion);
-
-        try {
-          if (znodeVersion == lastZnodeVersion) {
-            updated.await();
-
-            // are we closed?
-            if (isClosed) {
-              log.warn("OverseerTriggerThread woken up but we are closed, exiting.");
-              break;
-            }
-
-            // spurious wakeup?
-            if (znodeVersion == lastZnodeVersion) continue;
-          }
-          copy = new HashMap<>(activeTriggers);
-          lastZnodeVersion = znodeVersion;
-          log.debug("Processed trigger updates upto znodeVersion {}", znodeVersion);
-        } catch (InterruptedException e) {
-          // Restore the interrupted status
-          Thread.currentThread().interrupt();
-          log.warn("Interrupted", e);
-          break;
-        } finally {
-          updateLock.unlock();
-        }
-      } catch (InterruptedException e) {
-        // Restore the interrupted status
-        Thread.currentThread().interrupt();
-        log.warn("Interrupted", e);
-        break;
-      }
-
-      // update the current config
-      scheduledTriggers.setAutoScalingConfig(autoScalingConfig);
-
-      Set<String> managedTriggerNames = scheduledTriggers.getScheduledTriggerNames();
-      // remove the triggers which are no longer active
-      for (String managedTriggerName : managedTriggerNames) {
-        if (!copy.containsKey(managedTriggerName)) {
-          scheduledTriggers.remove(managedTriggerName);
-        }
-      }
-      // check for nodeLost triggers in the current config, and if
-      // absent then clean up old nodeLost / nodeAdded markers
-      boolean cleanOldNodeLostMarkers = true;
-      boolean cleanOldNodeAddedMarkers = true;
-      try {
-        // add new triggers and/or replace and close the replaced triggers
-        for (Map.Entry<String, AutoScaling.Trigger> entry : copy.entrySet()) {
-          if (entry.getValue().getEventType().equals(TriggerEventType.NODELOST)) {
-            cleanOldNodeLostMarkers = false;
-          }
-          if (entry.getValue().getEventType().equals(TriggerEventType.NODEADDED)) {
-            cleanOldNodeAddedMarkers = false;
-          }
-          try {
-            scheduledTriggers.add(entry.getValue());
-          } catch (Exception e) {
-            log.warn("Exception initializing trigger " + entry.getKey() + ", configuration ignored", e);
-          }
-        }
-      } catch (AlreadyClosedException e) {
-        // this _should_ mean that we're closing, complain loudly if that's not the case
-        if (isClosed) {
-          return;
-        } else {
-          throw new IllegalStateException("Caught AlreadyClosedException from ScheduledTriggers, but we're not closed yet!", e);
-        }
-      }
-      DistribStateManager stateManager = cloudManager.getDistribStateManager();
-      if (cleanOldNodeLostMarkers) {
-        log.debug("-- clean old nodeLost markers");
-        try {
-          List<String> markers = stateManager.listData(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH);
-          markers.forEach(n -> {
-            removeNodeMarker(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH, n);
-          });
-        } catch (NoSuchElementException e) {
-          // ignore
-        } catch (Exception e) {
-          log.warn("Error removing old nodeLost markers", e);
-        }
-      }
-      if (cleanOldNodeAddedMarkers) {
-        log.debug("-- clean old nodeAdded markers");
-        try {
-          List<String> markers = stateManager.listData(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH);
-          markers.forEach(n -> {
-            removeNodeMarker(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH, n);
-          });
-        } catch (NoSuchElementException e) {
-          // ignore
-        } catch (Exception e) {
-          log.warn("Error removing old nodeAdded markers", e);
-        }
-
-      }
-    }
-  }
-
-  private void removeNodeMarker(String path, String nodeName) {
-    path = path + "/" + nodeName;
-    try {
-      cloudManager.getDistribStateManager().removeData(path, -1);
-      log.debug("  -- deleted " + path);
-    } catch (NoSuchElementException e) {
-      // ignore
-    } catch (Exception e) {
-      log.warn("Error removing old marker " + path, e);
-    }
-  }
-
-  class AutoScalingWatcher implements Watcher  {
-    @Override
-    public void process(WatchedEvent watchedEvent) {
-      // session events are not change events, and do not remove the watcher
-      if (Event.EventType.None.equals(watchedEvent.getType())) {
-        return;
-      }
-
-      try {
-        refreshAutoScalingConf(this);
-      } catch (ConnectException e) {
-        log.warn("ZooKeeper watch triggered for autoscaling conf, but we cannot talk to ZK: [{}]", e.getMessage());
-      } catch (InterruptedException e) {
-        // Restore the interrupted status
-        Thread.currentThread().interrupt();
-        log.warn("Interrupted", e);
-      } catch (Exception e)  {
-        log.error("Unexpected exception", e);
-      }
-    }
-
-  }
-
-  private void refreshAutoScalingConf(Watcher watcher) throws InterruptedException, IOException {
-    updateLock.lock();
-    try {
-      if (isClosed) {
-        return;
-      }
-      AutoScalingConfig currentConfig = cloudManager.getDistribStateManager().getAutoScalingConfig(watcher);
-      log.debug("Refreshing {} with znode version {}", ZkStateReader.SOLR_AUTOSCALING_CONF_PATH, currentConfig.getZkVersion());
-      if (znodeVersion >= currentConfig.getZkVersion()) {
-        // protect against reordered watcher fires by ensuring that we only move forward
-        return;
-      }
-      autoScalingConfig = currentConfig;
-      znodeVersion = autoScalingConfig.getZkVersion();
-      Map<String, AutoScaling.Trigger> triggerMap = loadTriggers(triggerFactory, autoScalingConfig);
-
-      // remove all active triggers that have been removed from ZK
-      Set<String> trackingKeySet = activeTriggers.keySet();
-      trackingKeySet.retainAll(triggerMap.keySet());
-
-      // now lets add or remove triggers which have been enabled or disabled respectively
-      for (Map.Entry<String, AutoScaling.Trigger> entry : triggerMap.entrySet()) {
-        String triggerName = entry.getKey();
-        AutoScaling.Trigger trigger = entry.getValue();
-        if (trigger.isEnabled()) {
-          activeTriggers.put(triggerName, trigger);
-        } else {
-          activeTriggers.remove(triggerName);
-        }
-      }
-      updated.signalAll();
-    } finally {
-      updateLock.unlock();
-    }
-  }
-
-  private AutoScalingConfig withAutoAddReplicasTrigger(AutoScalingConfig autoScalingConfig) {
-    Map<String, Object> triggerProps = AutoScaling.AUTO_ADD_REPLICAS_TRIGGER_PROPS;
-    return withDefaultTrigger(triggerProps, autoScalingConfig);
-  }
-
-  private AutoScalingConfig withScheduledMaintenanceTrigger(AutoScalingConfig autoScalingConfig) {
-    Map<String, Object> triggerProps = AutoScaling.SCHEDULED_MAINTENANCE_TRIGGER_PROPS;
-    return withDefaultTrigger(triggerProps, autoScalingConfig);
-  }
-
-  private AutoScalingConfig withDefaultTrigger(Map<String, Object> triggerProps, AutoScalingConfig autoScalingConfig) {
-    String triggerName = (String) triggerProps.get("name");
-    Map<String, AutoScalingConfig.TriggerConfig> configs = autoScalingConfig.getTriggerConfigs();
-    for (AutoScalingConfig.TriggerConfig cfg : configs.values()) {
-      if (triggerName.equals(cfg.name)) {
-        // already has this trigger
-        return autoScalingConfig;
-      }
-    }
-    // need to add
-    triggerProps.computeIfPresent("waitFor", (k, v) -> (long) (cloudConfig.getAutoReplicaFailoverWaitAfterExpiration() / 1000));
-    AutoScalingConfig.TriggerConfig config = new AutoScalingConfig.TriggerConfig(triggerName, triggerProps);
-    autoScalingConfig = autoScalingConfig.withTriggerConfig(config);
-    // need to add SystemLogListener explicitly here
-    autoScalingConfig = AutoScalingHandler.withSystemLogListener(autoScalingConfig, triggerName);
-    return autoScalingConfig;
-  }
-
-  private static Map<String, AutoScaling.Trigger> loadTriggers(AutoScaling.TriggerFactory triggerFactory, AutoScalingConfig autoScalingConfig) {
-    Map<String, AutoScalingConfig.TriggerConfig> triggers = autoScalingConfig.getTriggerConfigs();
-    if (triggers == null) {
-      return Collections.emptyMap();
-    }
-
-    Map<String, AutoScaling.Trigger> triggerMap = new HashMap<>(triggers.size());
-
-    for (Map.Entry<String, AutoScalingConfig.TriggerConfig> entry : triggers.entrySet()) {
-      AutoScalingConfig.TriggerConfig cfg = entry.getValue();
-      TriggerEventType eventType = cfg.event;
-      String triggerName = entry.getKey();
-      try {
-        triggerMap.put(triggerName, triggerFactory.create(eventType, triggerName, cfg.properties));
-      } catch (TriggerValidationException e) {
-        log.warn("Error in trigger '" + triggerName + "' configuration, trigger config ignored: " + cfg, e);
-      }
-    }
-    return triggerMap;
-  }
-}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0ae21ad0/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTrigger.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTrigger.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTrigger.java
deleted file mode 100644
index 5e25542..0000000
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTrigger.java
+++ /dev/null
@@ -1,222 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.cloud.autoscaling;
-
-import java.lang.invoke.MethodHandles;
-import java.text.ParseException;
-import java.time.Instant;
-import java.time.format.DateTimeFormatter;
-import java.time.format.DateTimeFormatterBuilder;
-import java.time.temporal.ChronoField;
-import java.util.Collections;
-import java.util.Date;
-import java.util.Locale;
-import java.util.Map;
-import java.util.TimeZone;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.solr.client.solrj.cloud.SolrCloudManager;
-import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
-import org.apache.solr.common.SolrException;
-import org.apache.solr.common.params.AutoScalingParams;
-import org.apache.solr.common.util.TimeSource;
-import org.apache.solr.core.SolrResourceLoader;
-import org.apache.solr.util.DateMathParser;
-import org.apache.solr.util.TimeZoneUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.solr.common.params.AutoScalingParams.PREFERRED_OP;
-
-/**
- * A trigger which creates {@link TriggerEventType#SCHEDULED} events as per the configured schedule
- */
-public class ScheduledTrigger extends TriggerBase {
-  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
-  private static final String DEFAULT_GRACE_DURATION = "+15MINUTES";
-  private static final String LAST_RUN_AT = "lastRunAt";
-  static final String ACTUAL_EVENT_TIME = "actualEventTime";
-
-  private String everyStr;
-
-  private String graceDurationStr;
-
-  private String preferredOp;
-
-  private TimeZone timeZone;
-
-  private Instant lastRunAt;
-
-  public ScheduledTrigger(String name) {
-    super(TriggerEventType.SCHEDULED, name);
-    TriggerUtils.requiredProperties(requiredProperties, validProperties, "startTime");
-    TriggerUtils.validProperties(validProperties, "timeZone", "every", "graceDuration", AutoScalingParams.PREFERRED_OP);
-  }
-
-  @Override
-  public void configure(SolrResourceLoader loader, SolrCloudManager cloudManager, Map<String, Object> properties) throws TriggerValidationException {
-    super.configure(loader, cloudManager, properties);
-    String timeZoneStr = (String) properties.get("timeZone");
-    this.timeZone = TimeZoneUtils.parseTimezone(timeZoneStr); // defaults to UTC
-
-    String startTimeStr = (String) properties.get("startTime");
-    this.everyStr = (String) properties.get("every");
-    this.graceDurationStr = (String) properties.getOrDefault("graceDuration", DEFAULT_GRACE_DURATION);
-
-    preferredOp = (String) properties.get(PREFERRED_OP);
-
-    // attempt parsing to validate date math strings
-    // explicitly set NOW because it may be different for simulated time
-    Date now = new Date(TimeUnit.NANOSECONDS.toMillis(cloudManager.getTimeSource().getEpochTimeNs()));
-    Instant startTime = parseStartTime(now, startTimeStr, timeZoneStr);
-    DateMathParser.parseMath(now, startTime + everyStr, timeZone);
-    DateMathParser.parseMath(now, startTime + graceDurationStr, timeZone);
-
-    // We set lastRunAt to be the startTime (which could be a date math expression such as 'NOW')
-    // Ordinarily, NOW will always be evaluated in this constructor so it may seem that
-    // the trigger will always fire the first time.
-    // However, the lastRunAt is overwritten with the value from ZK
-    // during restoreState() operation (which is performed before run()) so the trigger works correctly
-    this.lastRunAt = startTime;
-  }
-
-  private Instant parseStartTime(Date now, String startTimeStr, String timeZoneStr) throws TriggerValidationException {
-    try {
-      // try parsing startTime as an ISO-8601 date time string
-      return DateMathParser.parseMath(now, startTimeStr).toInstant();
-    } catch (SolrException e) {
-      if (e.code() != SolrException.ErrorCode.BAD_REQUEST.code) {
-        throw new TriggerValidationException("startTime", "error parsing value '" + startTimeStr + "': " + e.toString());
-      }
-    }
-    if (timeZoneStr == null)  {
-      throw new TriggerValidationException("timeZone",
-          "Either 'startTime' should be an ISO-8601 date time string or 'timeZone' must be not be null");
-    }
-    TimeZone timeZone = TimeZone.getTimeZone(timeZoneStr);
-    DateTimeFormatter dateTimeFormatter = new DateTimeFormatterBuilder()
-        .append(DateTimeFormatter.ISO_LOCAL_DATE).appendPattern("['T'[HH[:mm[:ss]]]]")
-        .parseDefaulting(ChronoField.HOUR_OF_DAY, 0)
-        .parseDefaulting(ChronoField.MINUTE_OF_HOUR, 0)
-        .parseDefaulting(ChronoField.SECOND_OF_MINUTE, 0)
-        .toFormatter(Locale.ROOT).withZone(timeZone.toZoneId());
-    try {
-      return Instant.from(dateTimeFormatter.parse(startTimeStr));
-    } catch (Exception e) {
-      throw new TriggerValidationException("startTime", "error parsing startTime '" + startTimeStr + "': " + e.toString());
-    }
-  }
-
-  @Override
-  protected Map<String, Object> getState() {
-    return Collections.singletonMap(LAST_RUN_AT, lastRunAt.toEpochMilli());
-  }
-
-  @Override
-  protected void setState(Map<String, Object> state) {
-    if (state.containsKey(LAST_RUN_AT)) {
-      this.lastRunAt = Instant.ofEpochMilli((Long) state.get(LAST_RUN_AT));
-    }
-  }
-
-  @Override
-  public void restoreState(AutoScaling.Trigger old) {
-    assert old.isClosed();
-    if (old instanceof ScheduledTrigger) {
-      ScheduledTrigger scheduledTrigger = (ScheduledTrigger) old;
-      this.lastRunAt = scheduledTrigger.lastRunAt;
-    } else  {
-      throw new SolrException(SolrException.ErrorCode.INVALID_STATE,
-          "Unable to restore state from an unknown type of trigger");
-    }
-  }
-
-  @Override
-  public void run() {
-    synchronized (this) {
-      if (isClosed) {
-        log.warn("ScheduledTrigger ran but was already closed");
-        throw new RuntimeException("Trigger has been closed");
-      }
-    }
-
-    TimeSource timeSource = cloudManager.getTimeSource();
-    DateMathParser dateMathParser = new DateMathParser(timeZone);
-    dateMathParser.setNow(new Date(lastRunAt.toEpochMilli()));
-    Instant nextRunTime, nextPlusGrace;
-    try {
-      Date next = dateMathParser.parseMath(everyStr);
-      dateMathParser.setNow(next);
-      nextPlusGrace = dateMathParser.parseMath(graceDurationStr).toInstant();
-      nextRunTime = next.toInstant();
-    } catch (ParseException e) {
-      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
-          "Unable to calculate next run time. lastRan: " + lastRunAt.toString() + " and date math string: " + everyStr, e);
-    }
-
-    Instant now = Instant.ofEpochMilli(
-        TimeUnit.NANOSECONDS.toMillis(timeSource.getEpochTimeNs()));
-    AutoScaling.TriggerEventProcessor processor = processorRef.get();
-
-    if (now.isBefore(nextRunTime)) {
-      return; // it's not time yet
-    }
-    if (now.isAfter(nextPlusGrace)) {
-      // we are past time and we could not run per schedule so skip this event
-      if (log.isWarnEnabled())  {
-        log.warn("ScheduledTrigger was not able to run event at scheduled time: {}. Now: {}",
-            nextRunTime, now);
-      }
-      // Even though we are skipping the event, we need to notify any listeners of the IGNORED stage
-      // so we create a dummy event with the ignored=true flag and ScheduledTriggers will do the rest
-      if (processor != null && processor.process(new ScheduledEvent(getEventType(), getName(), timeSource.getTimeNs(),
-          preferredOp, now.toEpochMilli(), true))) {
-        lastRunAt = nextRunTime;
-        return;
-      }
-    }
-
-    if (processor != null)  {
-      if (log.isDebugEnabled()) {
-        log.debug("ScheduledTrigger {} firing registered processor for scheduled time {}, now={}", name,
-            nextRunTime, now);
-      }
-      if (processor.process(new ScheduledEvent(getEventType(), getName(), timeSource.getTimeNs(),
-          preferredOp, now.toEpochMilli()))) {
-        lastRunAt = nextRunTime; // set to nextRunTime instead of now to avoid drift
-      }
-    } else  {
-      lastRunAt = nextRunTime; // set to nextRunTime instead of now to avoid drift
-    }
-  }
-
-  public static class ScheduledEvent extends TriggerEvent {
-    public ScheduledEvent(TriggerEventType eventType, String source, long eventTime, String preferredOp, long actualEventTime) {
-      this(eventType, source, eventTime, preferredOp, actualEventTime, false);
-    }
-
-    public ScheduledEvent(TriggerEventType eventType, String source, long eventTime, String preferredOp, long actualEventTime, boolean ignored) {
-      super(eventType, source, eventTime, null, ignored);
-      if (preferredOp != null)  {
-        properties.put(PREFERRED_OP, preferredOp);
-      }
-      properties.put(ACTUAL_EVENT_TIME, actualEventTime);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0ae21ad0/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
deleted file mode 100644
index 7c3cbb0..0000000
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
+++ /dev/null
@@ -1,802 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.cloud.autoscaling;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.stream.Collectors;
-
-import org.apache.commons.lang3.exception.ExceptionUtils;
-import org.apache.lucene.store.AlreadyClosedException;
-import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
-import org.apache.solr.client.solrj.cloud.DistribStateManager;
-import org.apache.solr.client.solrj.cloud.SolrCloudManager;
-import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventProcessorStage;
-import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData;
-import org.apache.solr.client.solrj.request.CollectionAdminRequest.RequestStatusResponse;
-import org.apache.solr.client.solrj.response.RequestStatusState;
-import org.apache.solr.cloud.Stats;
-import org.apache.solr.common.SolrException;
-import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.util.ExecutorUtil;
-import org.apache.solr.common.util.IOUtils;
-import org.apache.solr.common.util.Utils;
-import org.apache.solr.core.SolrResourceLoader;
-import org.apache.solr.util.DefaultSolrThreadFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.solr.cloud.autoscaling.ExecutePlanAction.waitForTaskToFinish;
-import static org.apache.solr.common.params.AutoScalingParams.ACTION_THROTTLE_PERIOD_SECONDS;
-import static org.apache.solr.common.params.AutoScalingParams.TRIGGER_COOLDOWN_PERIOD_SECONDS;
-import static org.apache.solr.common.params.AutoScalingParams.TRIGGER_CORE_POOL_SIZE;
-import static org.apache.solr.common.params.AutoScalingParams.TRIGGER_SCHEDULE_DELAY_SECONDS;
-import static org.apache.solr.common.util.ExecutorUtil.awaitTermination;
-
-/**
- * Responsible for scheduling active triggers, starting and stopping them and
- * performing actions when they fire
- */
-public class ScheduledTriggers implements Closeable {
-  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-  public static final int DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS = 1;
-  public static final int DEFAULT_ACTION_THROTTLE_PERIOD_SECONDS = 5;
-  public static final int DEFAULT_COOLDOWN_PERIOD_SECONDS = 5;
-  public static final int DEFAULT_TRIGGER_CORE_POOL_SIZE = 4;
-
-  static final Map<String, Object> DEFAULT_PROPERTIES = new HashMap<>();
-
-  static {
-    DEFAULT_PROPERTIES.put(TRIGGER_SCHEDULE_DELAY_SECONDS, DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS);
-    DEFAULT_PROPERTIES.put(TRIGGER_COOLDOWN_PERIOD_SECONDS, DEFAULT_COOLDOWN_PERIOD_SECONDS);
-    DEFAULT_PROPERTIES.put(TRIGGER_CORE_POOL_SIZE, DEFAULT_TRIGGER_CORE_POOL_SIZE);
-    DEFAULT_PROPERTIES.put(ACTION_THROTTLE_PERIOD_SECONDS, DEFAULT_ACTION_THROTTLE_PERIOD_SECONDS);
-  }
-
-  private final Map<String, TriggerWrapper> scheduledTriggerWrappers = new ConcurrentHashMap<>();
-
-  /**
-   * Thread pool for scheduling the triggers
-   */
-  private final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;
-
-  /**
-   * Single threaded executor to run the actions upon a trigger event. We rely on this being a single
-   * threaded executor to ensure that trigger fires do not step on each other as well as to ensure
-   * that we do not run scheduled trigger threads while an action has been submitted to this executor
-   */
-  private final ExecutorService actionExecutor;
-
-  private boolean isClosed = false;
-
-  private final AtomicBoolean hasPendingActions = new AtomicBoolean(false);
-
-  private final AtomicLong cooldownStart = new AtomicLong();
-
-  private final AtomicLong cooldownPeriod = new AtomicLong(TimeUnit.SECONDS.toNanos(DEFAULT_COOLDOWN_PERIOD_SECONDS));
-
-  private final AtomicLong triggerDelay = new AtomicLong(DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS);
-
-  private final SolrCloudManager cloudManager;
-
-  private final DistribStateManager stateManager;
-
-  private final SolrResourceLoader loader;
-
-  private final Stats queueStats;
-
-  private final TriggerListeners listeners;
-
-  private AutoScalingConfig autoScalingConfig;
-
-  public ScheduledTriggers(SolrResourceLoader loader, SolrCloudManager cloudManager) {
-    scheduledThreadPoolExecutor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(DEFAULT_TRIGGER_CORE_POOL_SIZE,
-        new DefaultSolrThreadFactory("ScheduledTrigger"));
-    scheduledThreadPoolExecutor.setRemoveOnCancelPolicy(true);
-    scheduledThreadPoolExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
-    actionExecutor = ExecutorUtil.newMDCAwareSingleThreadExecutor(new DefaultSolrThreadFactory("AutoscalingActionExecutor"));
-    this.cloudManager = cloudManager;
-    this.stateManager = cloudManager.getDistribStateManager();
-    this.loader = loader;
-    queueStats = new Stats();
-    listeners = new TriggerListeners();
-    // initialize cooldown timer
-    cooldownStart.set(cloudManager.getTimeSource().getTimeNs() - cooldownPeriod.get());
-  }
-
-  /**
-   * Set the current autoscaling config. This is invoked by {@link OverseerTriggerThread} when autoscaling.json is updated,
-   * and it re-initializes trigger listeners and other properties used by the framework
-   * @param autoScalingConfig current autoscaling.json
-   */
-  public void setAutoScalingConfig(AutoScalingConfig autoScalingConfig) {
-    Map<String, Object> currentProps = new HashMap<>(DEFAULT_PROPERTIES);
-    if (this.autoScalingConfig != null) {
-      currentProps.putAll(this.autoScalingConfig.getProperties());
-    }
-
-    // reset listeners early in order to capture first execution of newly scheduled triggers
-    listeners.setAutoScalingConfig(autoScalingConfig);
-
-    for (Map.Entry<String, Object> entry : currentProps.entrySet()) {
-      Map<String, Object> newProps = autoScalingConfig.getProperties();
-      String key = entry.getKey();
-      if (newProps.containsKey(key) && !entry.getValue().equals(newProps.get(key))) {
-        log.debug("Changing value of autoscaling property: {} from: {} to: {}", key, entry.getValue(), newProps.get(key));
-        switch (key) {
-          case TRIGGER_SCHEDULE_DELAY_SECONDS:
-            triggerDelay.set(((Number) newProps.get(key)).intValue());
-            synchronized (this) {
-              scheduledTriggerWrappers.forEach((s, triggerWrapper) -> {
-                if (triggerWrapper.scheduledFuture.cancel(false)) {
-                  triggerWrapper.scheduledFuture = scheduledThreadPoolExecutor.scheduleWithFixedDelay(
-                      triggerWrapper, 0,
-                      cloudManager.getTimeSource().convertDelay(TimeUnit.SECONDS, triggerDelay.get(), TimeUnit.MILLISECONDS),
-                      TimeUnit.MILLISECONDS);
-                } else  {
-                  log.debug("Failed to cancel scheduled task: {}", s);
-                }
-              });
-            }
-            break;
-          case TRIGGER_COOLDOWN_PERIOD_SECONDS:
-            cooldownPeriod.set(TimeUnit.SECONDS.toNanos(((Number) newProps.get(key)).longValue()));
-            break;
-          case TRIGGER_CORE_POOL_SIZE:
-            this.scheduledThreadPoolExecutor.setCorePoolSize(((Number) newProps.get(key)).intValue());
-            break;
-        }
-      }
-    }
-
-    this.autoScalingConfig = autoScalingConfig;
-    // reset cooldown
-    cooldownStart.set(cloudManager.getTimeSource().getTimeNs() - cooldownPeriod.get());
-  }
-
-  /**
-   * Adds a new trigger or replaces an existing one. The replaced trigger, if any, is closed
-   * <b>before</b> the new trigger is run. If a trigger is replaced with itself then this
-   * operation becomes a no-op.
-   *
-   * @param newTrigger the trigger to be managed
-   * @throws AlreadyClosedException if this class has already been closed
-   */
-  public synchronized void add(AutoScaling.Trigger newTrigger) throws Exception {
-    if (isClosed) {
-      throw new AlreadyClosedException("ScheduledTriggers has been closed and cannot be used anymore");
-    }
-    TriggerWrapper st;
-    try {
-      st = new TriggerWrapper(newTrigger, cloudManager, queueStats);
-    } catch (Exception e) {
-      if (isClosed) {
-        throw new AlreadyClosedException("ScheduledTriggers has been closed and cannot be used anymore");
-      }
-      if (cloudManager.isClosed()) {
-        log.error("Failed to add trigger " + newTrigger.getName() + " - closing or disconnected from data provider", e);
-      } else {
-        log.error("Failed to add trigger " + newTrigger.getName(), e);
-      }
-      return;
-    }
-    TriggerWrapper triggerWrapper = st;
-
-    TriggerWrapper old = scheduledTriggerWrappers.putIfAbsent(newTrigger.getName(), triggerWrapper);
-    if (old != null) {
-      if (old.trigger.equals(newTrigger)) {
-        // the trigger wasn't actually modified so we do nothing
-        return;
-      }
-      IOUtils.closeQuietly(old);
-      newTrigger.restoreState(old.trigger);
-      triggerWrapper.setReplay(false);
-      scheduledTriggerWrappers.replace(newTrigger.getName(), triggerWrapper);
-    }
-    newTrigger.setProcessor(event -> {
-      TriggerListeners triggerListeners = listeners.copy();
-      if (cloudManager.isClosed()) {
-        String msg = String.format(Locale.ROOT, "Ignoring autoscaling event %s because Solr has been shutdown.", event.toString());
-        log.warn(msg);
-        triggerListeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.ABORTED, msg);
-        return false;
-      }
-      TriggerWrapper scheduledSource = scheduledTriggerWrappers.get(event.getSource());
-      if (scheduledSource == null) {
-        String msg = String.format(Locale.ROOT, "Ignoring autoscaling event %s because the source trigger: %s doesn't exist.", event.toString(), event.getSource());
-        triggerListeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.FAILED, msg);
-        log.warn(msg);
-        return false;
-      }
-      boolean replaying = event.getProperty(TriggerEvent.REPLAYING) != null ? (Boolean)event.getProperty(TriggerEvent.REPLAYING) : false;
-      AutoScaling.Trigger source = scheduledSource.trigger;
-      if (scheduledSource.isClosed || source.isClosed()) {
-        String msg = String.format(Locale.ROOT, "Ignoring autoscaling event %s because the source trigger: %s has already been closed", event.toString(), source);
-        triggerListeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.ABORTED, msg);
-        log.warn(msg);
-        // we do not want to lose this event just because the trigger was closed, perhaps a replacement will need it
-        return false;
-      }
-      if (event.isIgnored())  {
-        log.debug("-------- Ignoring event: " + event);
-        event.getProperties().put(TriggerEvent.IGNORED, true);
-        triggerListeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.IGNORED, "Event was ignored.");
-        return true; // always return true for ignored events
-      }
-      // even though we pause all triggers during action execution there is a possibility that a trigger was already
-      // running at the time and would have already created an event so we reject such events during cooldown period
-      if (cooldownStart.get() + cooldownPeriod.get() > cloudManager.getTimeSource().getTimeNs()) {
-        log.debug("-------- Cooldown period - rejecting event: " + event);
-        event.getProperties().put(TriggerEvent.COOLDOWN, true);
-        triggerListeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.IGNORED, "In cooldown period.");
-        return false;
-      } else {
-        log.debug("++++++++ Cooldown inactive - processing event: " + event);
-      }
-      if (hasPendingActions.compareAndSet(false, true)) {
-        // pause all triggers while we execute actions so triggers do not operate on a cluster in transition
-        pauseTriggers();
-
-        final boolean enqueued;
-        if (replaying) {
-          enqueued = false;
-        } else {
-          enqueued = triggerWrapper.enqueue(event);
-        }
-        // fire STARTED event listeners after enqueuing the event is successful
-        triggerListeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.STARTED);
-        List<TriggerAction> actions = source.getActions();
-        if (actions != null) {
-          if (actionExecutor.isShutdown()) {
-            String msg = String.format(Locale.ROOT, "Ignoring autoscaling event %s from trigger %s because the executor has already been closed", event.toString(), source);
-            triggerListeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.ABORTED, msg);
-            log.warn(msg);
-            // we do not want to lose this event just because the trigger was closed, perhaps a replacement will need it
-            return false;
-          }
-          actionExecutor.submit(() -> {
-            assert hasPendingActions.get();
-            long eventProcessingStart = cloudManager.getTimeSource().getTimeNs();
-            TriggerListeners triggerListeners1 = triggerListeners.copy();
-            log.debug("-- processing actions for " + event);
-            try {
-              // in future, we could wait for pending tasks in a different thread and re-enqueue
-              // this event so that we continue processing other events and not block this action executor
-              waitForPendingTasks(newTrigger, actions);
-
-              ActionContext actionContext = new ActionContext(cloudManager, newTrigger, new HashMap<>());
-              for (TriggerAction action : actions) {
-                List<String> beforeActions = (List<String>) actionContext.getProperties().computeIfAbsent(TriggerEventProcessorStage.BEFORE_ACTION.toString(), k -> new ArrayList<String>());
-                beforeActions.add(action.getName());
-                triggerListeners1.fireListeners(event.getSource(), event, TriggerEventProcessorStage.BEFORE_ACTION, action.getName(), actionContext);
-                try {
-                  action.process(event, actionContext);
-                } catch (Exception e) {
-                  triggerListeners1.fireListeners(event.getSource(), event, TriggerEventProcessorStage.FAILED, action.getName(), actionContext, e, null);
-                  throw new TriggerActionException(event.getSource(), action.getName(), "Error processing action for trigger event: " + event, e);
-                }
-                List<String> afterActions = (List<String>) actionContext.getProperties().computeIfAbsent(TriggerEventProcessorStage.AFTER_ACTION.toString(), k -> new ArrayList<String>());
-                afterActions.add(action.getName());
-                triggerListeners1.fireListeners(event.getSource(), event, TriggerEventProcessorStage.AFTER_ACTION, action.getName(), actionContext);
-              }
-              if (enqueued) {
-                TriggerEvent ev = triggerWrapper.dequeue();
-                assert ev.getId().equals(event.getId());
-              }
-              triggerListeners1.fireListeners(event.getSource(), event, TriggerEventProcessorStage.SUCCEEDED);
-            } catch (TriggerActionException e) {
-              log.warn("Exception executing actions", e);
-            } catch (Exception e) {
-              triggerListeners1.fireListeners(event.getSource(), event, TriggerEventProcessorStage.FAILED);
-              log.warn("Unhandled exception executing actions", e);
-            } finally {
-              cooldownStart.set(cloudManager.getTimeSource().getTimeNs());
-              hasPendingActions.set(false);
-              // resume triggers after cool down period
-              resumeTriggers(cloudManager.getTimeSource().convertDelay(TimeUnit.NANOSECONDS, cooldownPeriod.get(), TimeUnit.MILLISECONDS));
-            }
-            log.debug("-- processing took {} ms for event id={}",
-                TimeUnit.NANOSECONDS.toMillis(cloudManager.getTimeSource().getTimeNs() - eventProcessingStart), event.id);
-          });
-        } else {
-          if (enqueued) {
-            TriggerEvent ev = triggerWrapper.dequeue();
-            if (!ev.getId().equals(event.getId())) {
-              throw new RuntimeException("Wrong event dequeued, queue of " + triggerWrapper.trigger.getName()
-              + " is broken! Expected event=" + event + " but got " + ev);
-            }
-          }
-          triggerListeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.SUCCEEDED);
-          hasPendingActions.set(false);
-          // resume triggers now
-          resumeTriggers(0);
-        }
-        return true;
-      } else {
-        // there is an action in the queue and we don't want to enqueue another until it is complete
-        triggerListeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.IGNORED, "Already processing another event.");
-        return false;
-      }
-    });
-    newTrigger.init(); // mark as ready for scheduling
-    triggerWrapper.scheduledFuture = scheduledThreadPoolExecutor.scheduleWithFixedDelay(triggerWrapper, 0,
-        cloudManager.getTimeSource().convertDelay(TimeUnit.SECONDS, triggerDelay.get(), TimeUnit.MILLISECONDS),
-        TimeUnit.MILLISECONDS);
-  }
-
-  /**
-   * Pauses all scheduled trigger invocations without interrupting any that are in progress
-   * @lucene.internal
-   */
-  public synchronized void pauseTriggers()  {
-    if (log.isDebugEnabled()) {
-      log.debug("Pausing all triggers: {}", scheduledTriggerWrappers.keySet());
-    }
-    scheduledTriggerWrappers.forEach((s, triggerWrapper) -> triggerWrapper.scheduledFuture.cancel(false));
-  }
-
-  /**
-   * Resumes all previously cancelled triggers to be scheduled after the given initial delay
-   * @param afterDelayMillis the initial delay in milliseconds after which triggers should be resumed
-   * @lucene.internal
-   */
-  public synchronized void resumeTriggers(long afterDelayMillis) {
-    scheduledTriggerWrappers.forEach((s, triggerWrapper) ->  {
-      if (triggerWrapper.scheduledFuture.isCancelled()) {
-        log.debug("Resuming trigger: {} after {}ms", s, afterDelayMillis);
-        triggerWrapper.scheduledFuture = scheduledThreadPoolExecutor.scheduleWithFixedDelay(triggerWrapper, afterDelayMillis,
-            cloudManager.getTimeSource().convertDelay(TimeUnit.SECONDS, triggerDelay.get(), TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
-      }
-    });
-  }
-
-  private void waitForPendingTasks(AutoScaling.Trigger newTrigger, List<TriggerAction> actions) throws AlreadyClosedException {
-    DistribStateManager stateManager = cloudManager.getDistribStateManager();
-    try {
-
-      for (TriggerAction action : actions) {
-        if (action instanceof ExecutePlanAction) {
-          String parentPath = ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH + "/" + newTrigger.getName() + "/" + action.getName();
-          if (!stateManager.hasData(parentPath))  {
-            break;
-          }
-          List<String> children = stateManager.listData(parentPath);
-          if (children != null) {
-            for (String child : children) {
-              String path = parentPath + '/' + child;
-              VersionedData data = stateManager.getData(path, null);
-              if (data != null) {
-                Map map = (Map) Utils.fromJSON(data.getData());
-                String requestid = (String) map.get("requestid");
-                try {
-                  log.debug("Found pending task with requestid={}", requestid);
-                  RequestStatusResponse statusResponse = waitForTaskToFinish(cloudManager, requestid,
-                      ExecutePlanAction.DEFAULT_TASK_TIMEOUT_SECONDS, TimeUnit.SECONDS);
-                  if (statusResponse != null) {
-                    RequestStatusState state = statusResponse.getRequestStatus();
-                    if (state == RequestStatusState.COMPLETED || state == RequestStatusState.FAILED || state == RequestStatusState.NOT_FOUND) {
-                      stateManager.removeData(path, -1);
-                    }
-                  }
-                } catch (Exception e) {
-                  if (cloudManager.isClosed())  {
-                    throw e; // propagate the abort to the caller
-                  }
-                  Throwable rootCause = ExceptionUtils.getRootCause(e);
-                  if (rootCause instanceof IllegalStateException && rootCause.getMessage().contains("Connection pool shut down")) {
-                    throw e;
-                  }
-                  if (rootCause instanceof TimeoutException && rootCause.getMessage().contains("Could not connect to ZooKeeper")) {
-                    throw e;
-                  }
-                  log.error("Unexpected exception while waiting for pending task with requestid: " + requestid + " to finish", e);
-                }
-              }
-            }
-          }
-        }
-      }
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Thread interrupted", e);
-    } catch (Exception e) {
-      if (cloudManager.isClosed())  {
-        throw new AlreadyClosedException("The Solr instance has been shutdown");
-      }
-      // we catch but don't rethrow because a failure to wait for pending tasks
-      // should not keep the actions from executing
-      log.error("Unexpected exception while waiting for pending tasks to finish", e);
-    }
-  }
-
-  /**
-   * Remove and stop all triggers. Also cleans up any leftover
-   * state / events in ZK.
-   */
-  public synchronized void removeAll() {
-    getScheduledTriggerNames().forEach(t -> {
-      log.info("-- removing trigger: " + t);
-      remove(t);
-    });
-  }
-
-  /**
-   * Removes and stops the trigger with the given name. Also cleans up any leftover
-   * state / events in ZK.
-   *
-   * @param triggerName the name of the trigger to be removed
-   */
-  public synchronized void remove(String triggerName) {
-    TriggerWrapper removed = scheduledTriggerWrappers.remove(triggerName);
-    IOUtils.closeQuietly(removed);
-    removeTriggerZKData(triggerName);
-  }
-
-  private void removeTriggerZKData(String triggerName) {
-    String statePath = ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH + "/" + triggerName;
-    String eventsPath = ZkStateReader.SOLR_AUTOSCALING_EVENTS_PATH + "/" + triggerName;
-    try {
-      stateManager.removeRecursively(statePath, true, true);
-    } catch (Exception e) {
-      log.warn("Failed to remove state for removed trigger " + statePath, e);
-    }
-    try {
-      stateManager.removeRecursively(eventsPath, true, true);
-    } catch (Exception e) {
-      log.warn("Failed to remove events for removed trigger " + eventsPath, e);
-    }
-  }
-
-  /**
-   * @return an unmodifiable set of names of all triggers being managed by this class
-   */
-  public synchronized Set<String> getScheduledTriggerNames() {
-    return Collections.unmodifiableSet(new HashSet<>(scheduledTriggerWrappers.keySet())); // shallow copy
-  }
-
-  @Override
-  public void close() throws IOException {
-    synchronized (this) {
-      // mark that we are closed
-      isClosed = true;
-      for (TriggerWrapper triggerWrapper : scheduledTriggerWrappers.values()) {
-        IOUtils.closeQuietly(triggerWrapper);
-      }
-      scheduledTriggerWrappers.clear();
-    }
-    // shutdown and interrupt all running tasks because there's no longer any
-    // guarantee about cluster state
-    log.debug("Shutting down scheduled thread pool executor now");
-    scheduledThreadPoolExecutor.shutdownNow();
-
-    log.debug("Shutting down action executor now");
-    actionExecutor.shutdownNow();
-
-    listeners.close();
-
-    log.debug("Awaiting termination for action executor");
-    awaitTermination(actionExecutor);
-
-    log.debug("Awaiting termination for scheduled thread pool executor");
-    awaitTermination(scheduledThreadPoolExecutor);
-
-    log.debug("ScheduledTriggers closed completely");
-  }
-
-  private class TriggerWrapper implements Runnable, Closeable {
-    AutoScaling.Trigger trigger;
-    ScheduledFuture<?> scheduledFuture;
-    TriggerEventQueue queue;
-    boolean replay;
-    volatile boolean isClosed;
-
-    TriggerWrapper(AutoScaling.Trigger trigger, SolrCloudManager cloudManager, Stats stats) throws IOException {
-      this.trigger = trigger;
-      this.queue = new TriggerEventQueue(cloudManager, trigger.getName(), stats);
-      this.replay = true;
-      this.isClosed = false;
-    }
-
-    public void setReplay(boolean replay) {
-      this.replay = replay;
-    }
-
-    public boolean enqueue(TriggerEvent event) {
-      if (isClosed) {
-        throw new AlreadyClosedException("ScheduledTrigger " + trigger.getName() + " has been closed.");
-      }
-      return queue.offerEvent(event);
-    }
-
-    public TriggerEvent dequeue() {
-      if (isClosed) {
-        throw new AlreadyClosedException("ScheduledTrigger " + trigger.getName() + " has been closed.");
-      }
-      TriggerEvent event = queue.pollEvent();
-      return event;
-    }
-
-    @Override
-    public void run() {
-      if (isClosed) {
-        throw new AlreadyClosedException("ScheduledTrigger " + trigger.getName() + " has been closed.");
-      }
-      // fire a trigger only if an action is not pending
-      // note this is not fool proof e.g. it does not prevent an action being executed while a trigger
-      // is still executing. There is additional protection against that scenario in the event listener.
-      if (!hasPendingActions.get())  {
-        // this synchronization is usually never under contention
-        // but the only reason to have it here is to ensure that when the set-properties API is used
-        // to change the schedule delay, we can safely cancel the old scheduled task
-        // and create another one with the new delay without worrying about concurrent
-        // execution of the same trigger instance
-        synchronized (TriggerWrapper.this) {
-          // replay accumulated events on first run, if any
-          if (replay) {
-            TriggerEvent event;
-            // peek first without removing - we may crash before calling the listener
-            while ((event = queue.peekEvent()) != null) {
-              // override REPLAYING=true
-              event.getProperties().put(TriggerEvent.REPLAYING, true);
-              if (! trigger.getProcessor().process(event)) {
-                log.error("Failed to re-play event, discarding: " + event);
-              }
-              queue.pollEvent(); // always remove it from queue
-            }
-            // now restore saved state to possibly generate new events from old state on the first run
-            try {
-              trigger.restoreState();
-            } catch (Exception e) {
-              // log but don't throw - see below
-              log.error("Error restoring trigger state " + trigger.getName(), e);
-            }
-            replay = false;
-          }
-          try {
-            trigger.run();
-          } catch (Exception e) {
-            // log but do not propagate exception because an exception thrown from a scheduled operation
-            // will suppress future executions
-            log.error("Unexpected exception from trigger: " + trigger.getName(), e);
-          } finally {
-            // checkpoint after each run
-            trigger.saveState();
-          }
-        }
-      }
-    }
-
-    @Override
-    public void close() throws IOException {
-      isClosed = true;
-      if (scheduledFuture != null) {
-        scheduledFuture.cancel(true);
-      }
-      IOUtils.closeQuietly(trigger);
-    }
-  }
-
-  private class TriggerListeners {
-    Map<String, Map<TriggerEventProcessorStage, List<TriggerListener>>> listenersPerStage = new HashMap<>();
-    Map<String, TriggerListener> listenersPerName = new HashMap<>();
-    ReentrantLock updateLock = new ReentrantLock();
-
-    public TriggerListeners() {
-
-    }
-
-    private TriggerListeners(Map<String, Map<TriggerEventProcessorStage, List<TriggerListener>>> listenersPerStage,
-                             Map<String, TriggerListener> listenersPerName) {
-      this.listenersPerStage = new HashMap<>();
-      listenersPerStage.forEach((n, listeners) -> {
-        Map<TriggerEventProcessorStage, List<TriggerListener>> perStage = this.listenersPerStage.computeIfAbsent(n, name -> new HashMap<>());
-        listeners.forEach((s, lst) -> {
-          List<TriggerListener> newLst = perStage.computeIfAbsent(s, stage -> new ArrayList<>());
-          newLst.addAll(lst);
-        });
-      });
-      this.listenersPerName = new HashMap<>(listenersPerName);
-    }
-
-    public TriggerListeners copy() {
-      return new TriggerListeners(listenersPerStage, listenersPerName);
-    }
-
-    void setAutoScalingConfig(AutoScalingConfig autoScalingConfig) {
-      updateLock.lock();
-      // we will recreate this from scratch
-      listenersPerStage.clear();
-      try {
-        Set<String> triggerNames = autoScalingConfig.getTriggerConfigs().keySet();
-        Map<String, AutoScalingConfig.TriggerListenerConfig> configs = autoScalingConfig.getTriggerListenerConfigs();
-        Set<String> listenerNames = configs.entrySet().stream().map(entry -> entry.getValue().name).collect(Collectors.toSet());
-        // close those for non-existent triggers and nonexistent listener configs
-        for (Iterator<Map.Entry<String, TriggerListener>> it = listenersPerName.entrySet().iterator(); it.hasNext(); ) {
-          Map.Entry<String, TriggerListener> entry = it.next();
-          String name = entry.getKey();
-          TriggerListener listener = entry.getValue();
-          if (!triggerNames.contains(listener.getConfig().trigger) || !listenerNames.contains(name)) {
-            try {
-              listener.close();
-            } catch (Exception e) {
-              log.warn("Exception closing old listener " + listener.getConfig(), e);
-            }
-            it.remove();
-          }
-        }
-        for (Map.Entry<String, AutoScalingConfig.TriggerListenerConfig> entry : configs.entrySet()) {
-          AutoScalingConfig.TriggerListenerConfig config = entry.getValue();
-          if (!triggerNames.contains(config.trigger)) {
-            log.debug("-- skipping listener for non-existent trigger: {}", config);
-            continue;
-          }
-          // find previous instance and reuse if possible
-          TriggerListener oldListener = listenersPerName.get(config.name);
-          TriggerListener listener = null;
-          if (oldListener != null) {
-            if (!oldListener.getConfig().equals(config)) { // changed config
-              try {
-                oldListener.close();
-              } catch (Exception e) {
-                log.warn("Exception closing old listener " + oldListener.getConfig(), e);
-              }
-            } else {
-              listener = oldListener; // reuse
-            }
-          }
-          if (listener == null) { // create new instance
-            String clazz = config.listenerClass;
-            try {
-              listener = loader.newInstance(clazz, TriggerListener.class);
-            } catch (Exception e) {
-              log.warn("Invalid TriggerListener class name '" + clazz + "', skipping...", e);
-            }
-            if (listener != null) {
-              try {
-                listener.configure(loader, cloudManager, config);
-                listener.init();
-                listenersPerName.put(config.name, listener);
-              } catch (Exception e) {
-                log.warn("Error initializing TriggerListener " + config, e);
-                IOUtils.closeQuietly(listener);
-                listener = null;
-              }
-            }
-          }
-          if (listener == null) {
-            continue;
-          }
-          // add per stage
-          for (TriggerEventProcessorStage stage : config.stages) {
-            addPerStage(config.trigger, stage, listener);
-          }
-          // add also for beforeAction / afterAction TriggerStage
-          if (!config.beforeActions.isEmpty()) {
-            addPerStage(config.trigger, TriggerEventProcessorStage.BEFORE_ACTION, listener);
-          }
-          if (!config.afterActions.isEmpty()) {
-            addPerStage(config.trigger, TriggerEventProcessorStage.AFTER_ACTION, listener);
-          }
-        }
-      } finally {
-        updateLock.unlock();
-      }
-    }
-
-    private void addPerStage(String triggerName, TriggerEventProcessorStage stage, TriggerListener listener) {
-      Map<TriggerEventProcessorStage, List<TriggerListener>> perStage =
-          listenersPerStage.computeIfAbsent(triggerName, k -> new HashMap<>());
-      List<TriggerListener> lst = perStage.computeIfAbsent(stage, k -> new ArrayList<>(3));
-      lst.add(listener);
-    }
-
-    void reset() {
-      updateLock.lock();
-      try {
-        listenersPerStage.clear();
-        for (TriggerListener listener : listenersPerName.values()) {
-          IOUtils.closeQuietly(listener);
-        }
-        listenersPerName.clear();
-      } finally {
-        updateLock.unlock();
-      }
-    }
-
-    void close() {
-      reset();
-    }
-
-    List<TriggerListener> getTriggerListeners(String trigger, TriggerEventProcessorStage stage) {
-      Map<TriggerEventProcessorStage, List<TriggerListener>> perStage = listenersPerStage.get(trigger);
-      if (perStage == null) {
-        return Collections.emptyList();
-      }
-      List<TriggerListener> lst = perStage.get(stage);
-      if (lst == null) {
-        return Collections.emptyList();
-      } else {
-        return Collections.unmodifiableList(lst);
-      }
-    }
-
-    void fireListeners(String trigger, TriggerEvent event, TriggerEventProcessorStage stage) {
-      fireListeners(trigger, event, stage, null, null, null, null);
-    }
-
-    void fireListeners(String trigger, TriggerEvent event, TriggerEventProcessorStage stage, String message) {
-      fireListeners(trigger, event, stage, null, null, null, message);
-    }
-
-    void fireListeners(String trigger, TriggerEvent event, TriggerEventProcessorStage stage, String actionName,
-                       ActionContext context) {
-      fireListeners(trigger, event, stage, actionName, context, null, null);
-    }
-
-    void fireListeners(String trigger, TriggerEvent event, TriggerEventProcessorStage stage, String actionName,
-                       ActionContext context, Throwable error, String message) {
-      updateLock.lock();
-      try {
-        for (TriggerListener listener : getTriggerListeners(trigger, stage)) {
-          if (!listener.isEnabled()) {
-            continue;
-          }
-          if (actionName != null) {
-            AutoScalingConfig.TriggerListenerConfig config = listener.getConfig();
-            if (stage == TriggerEventProcessorStage.BEFORE_ACTION) {
-              if (!config.beforeActions.contains(actionName)) {
-                continue;
-              }
-            } else if (stage == TriggerEventProcessorStage.AFTER_ACTION) {
-              if (!config.afterActions.contains(actionName)) {
-                continue;
-              }
-            }
-          }
-          try {
-            listener.onEvent(event, stage, actionName, context, error, message);
-          } catch (Exception e) {
-            log.warn("Exception running listener " + listener.getConfig(), e);
-          }
-        }
-      } finally {
-        updateLock.unlock();
-      }
-    }
-  }
-}