You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by da...@apache.org on 2018/10/23 00:05:57 UTC
[36/52] [abbrv] [partial] lucene-solr:jira/gradle: Add gradle support
for Solr
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0ae21ad0/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
deleted file mode 100644
index 052b4c4..0000000
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
+++ /dev/null
@@ -1,405 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.cloud.autoscaling;
-
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.net.ConnectException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.Set;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.lucene.store.AlreadyClosedException;
-import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
-import org.apache.solr.client.solrj.cloud.autoscaling.BadVersionException;
-import org.apache.solr.client.solrj.cloud.DistribStateManager;
-import org.apache.solr.client.solrj.cloud.SolrCloudManager;
-import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
-import org.apache.solr.common.SolrCloseable;
-import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.util.IOUtils;
-import org.apache.solr.common.util.Utils;
-import org.apache.solr.core.CloudConfig;
-import org.apache.solr.core.SolrResourceLoader;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.solr.common.cloud.ZkStateReader.SOLR_AUTOSCALING_CONF_PATH;
-
-/**
- * Overseer thread responsible for reading triggers from zookeeper and
- * adding/removing them from {@link ScheduledTriggers}
- */
-public class OverseerTriggerThread implements Runnable, SolrCloseable {
-
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
- private final SolrCloudManager cloudManager;
-
- private final CloudConfig cloudConfig;
-
- private final ScheduledTriggers scheduledTriggers;
-
- private final AutoScaling.TriggerFactory triggerFactory;
-
- private final ReentrantLock updateLock = new ReentrantLock();
-
- private final Condition updated = updateLock.newCondition();
-
- /*
- Following variables are only accessed or modified when updateLock is held
- */
- private int znodeVersion = -1;
-
- private Map<String, AutoScaling.Trigger> activeTriggers = new HashMap<>();
-
- private volatile boolean isClosed = false;
-
- private AutoScalingConfig autoScalingConfig;
-
- public OverseerTriggerThread(SolrResourceLoader loader, SolrCloudManager cloudManager, CloudConfig cloudConfig) {
- this.cloudManager = cloudManager;
- this.cloudConfig = cloudConfig;
- scheduledTriggers = new ScheduledTriggers(loader, cloudManager);
- triggerFactory = new AutoScaling.TriggerFactoryImpl(loader, cloudManager);
- }
-
- @Override
- public void close() throws IOException {
- updateLock.lock();
- try {
- isClosed = true;
- activeTriggers.clear();
- updated.signalAll();
- } finally {
- updateLock.unlock();
- }
- IOUtils.closeQuietly(triggerFactory);
- IOUtils.closeQuietly(scheduledTriggers);
- log.debug("OverseerTriggerThread has been closed explicitly");
- }
-
- /**
- * For tests.
- * @lucene.internal
- * @return current {@link ScheduledTriggers} instance
- */
- public ScheduledTriggers getScheduledTriggers() {
- return scheduledTriggers;
- }
-
- @Override
- public boolean isClosed() {
- return isClosed;
- }
-
- @Override
- public void run() {
- int lastZnodeVersion = znodeVersion;
-
- // we automatically add a trigger for auto add replicas if it does not exists already
- // we also automatically add a scheduled maintenance trigger
- while (!isClosed) {
- try {
- if (Thread.currentThread().isInterrupted()) {
- log.warn("Interrupted");
- break;
- }
- AutoScalingConfig autoScalingConfig = cloudManager.getDistribStateManager().getAutoScalingConfig();
- AutoScalingConfig updatedConfig = withAutoAddReplicasTrigger(autoScalingConfig);
- updatedConfig = withScheduledMaintenanceTrigger(updatedConfig);
- if (updatedConfig.equals(autoScalingConfig)) break;
- log.debug("Adding .auto_add_replicas and .scheduled_maintenance triggers");
- cloudManager.getDistribStateManager().setData(SOLR_AUTOSCALING_CONF_PATH, Utils.toJSON(updatedConfig), updatedConfig.getZkVersion());
- break;
- } catch (BadVersionException bve) {
- // somebody else has changed the configuration so we must retry
- } catch (InterruptedException e) {
- // Restore the interrupted status
- Thread.currentThread().interrupt();
- log.warn("Interrupted", e);
- break;
- }
- catch (IOException | KeeperException e) {
- if (e instanceof KeeperException.SessionExpiredException ||
- (e.getCause()!=null && e.getCause() instanceof KeeperException.SessionExpiredException)) {
- log.warn("Solr cannot talk to ZK, exiting " +
- getClass().getSimpleName() + " main queue loop", e);
- return;
- } else {
- log.error("A ZK error has occurred", e);
- }
- }
- }
-
- if (isClosed || Thread.currentThread().isInterrupted()) return;
-
- try {
- refreshAutoScalingConf(new AutoScalingWatcher());
- } catch (ConnectException e) {
- log.warn("ZooKeeper watch triggered for autoscaling conf, but Solr cannot talk to ZK: [{}]", e.getMessage());
- } catch (InterruptedException e) {
- // Restore the interrupted status
- Thread.currentThread().interrupt();
- log.warn("Interrupted", e);
- } catch (Exception e) {
- log.error("Unexpected exception", e);
- }
-
- while (true) {
- Map<String, AutoScaling.Trigger> copy = null;
- try {
- // this can throw InterruptedException and we don't want to unlock if it did, so we keep this outside
- // of the try/finally block
- updateLock.lockInterruptibly();
-
- // must check for close here before we await on the condition otherwise we can only be woken up on interruption
- if (isClosed) {
- log.warn("OverseerTriggerThread has been closed, exiting.");
- break;
- }
-
- log.debug("Current znodeVersion {}, lastZnodeVersion {}", znodeVersion, lastZnodeVersion);
-
- try {
- if (znodeVersion == lastZnodeVersion) {
- updated.await();
-
- // are we closed?
- if (isClosed) {
- log.warn("OverseerTriggerThread woken up but we are closed, exiting.");
- break;
- }
-
- // spurious wakeup?
- if (znodeVersion == lastZnodeVersion) continue;
- }
- copy = new HashMap<>(activeTriggers);
- lastZnodeVersion = znodeVersion;
- log.debug("Processed trigger updates upto znodeVersion {}", znodeVersion);
- } catch (InterruptedException e) {
- // Restore the interrupted status
- Thread.currentThread().interrupt();
- log.warn("Interrupted", e);
- break;
- } finally {
- updateLock.unlock();
- }
- } catch (InterruptedException e) {
- // Restore the interrupted status
- Thread.currentThread().interrupt();
- log.warn("Interrupted", e);
- break;
- }
-
- // update the current config
- scheduledTriggers.setAutoScalingConfig(autoScalingConfig);
-
- Set<String> managedTriggerNames = scheduledTriggers.getScheduledTriggerNames();
- // remove the triggers which are no longer active
- for (String managedTriggerName : managedTriggerNames) {
- if (!copy.containsKey(managedTriggerName)) {
- scheduledTriggers.remove(managedTriggerName);
- }
- }
- // check for nodeLost triggers in the current config, and if
- // absent then clean up old nodeLost / nodeAdded markers
- boolean cleanOldNodeLostMarkers = true;
- boolean cleanOldNodeAddedMarkers = true;
- try {
- // add new triggers and/or replace and close the replaced triggers
- for (Map.Entry<String, AutoScaling.Trigger> entry : copy.entrySet()) {
- if (entry.getValue().getEventType().equals(TriggerEventType.NODELOST)) {
- cleanOldNodeLostMarkers = false;
- }
- if (entry.getValue().getEventType().equals(TriggerEventType.NODEADDED)) {
- cleanOldNodeAddedMarkers = false;
- }
- try {
- scheduledTriggers.add(entry.getValue());
- } catch (Exception e) {
- log.warn("Exception initializing trigger " + entry.getKey() + ", configuration ignored", e);
- }
- }
- } catch (AlreadyClosedException e) {
- // this _should_ mean that we're closing, complain loudly if that's not the case
- if (isClosed) {
- return;
- } else {
- throw new IllegalStateException("Caught AlreadyClosedException from ScheduledTriggers, but we're not closed yet!", e);
- }
- }
- DistribStateManager stateManager = cloudManager.getDistribStateManager();
- if (cleanOldNodeLostMarkers) {
- log.debug("-- clean old nodeLost markers");
- try {
- List<String> markers = stateManager.listData(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH);
- markers.forEach(n -> {
- removeNodeMarker(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH, n);
- });
- } catch (NoSuchElementException e) {
- // ignore
- } catch (Exception e) {
- log.warn("Error removing old nodeLost markers", e);
- }
- }
- if (cleanOldNodeAddedMarkers) {
- log.debug("-- clean old nodeAdded markers");
- try {
- List<String> markers = stateManager.listData(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH);
- markers.forEach(n -> {
- removeNodeMarker(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH, n);
- });
- } catch (NoSuchElementException e) {
- // ignore
- } catch (Exception e) {
- log.warn("Error removing old nodeAdded markers", e);
- }
-
- }
- }
- }
-
- private void removeNodeMarker(String path, String nodeName) {
- path = path + "/" + nodeName;
- try {
- cloudManager.getDistribStateManager().removeData(path, -1);
- log.debug(" -- deleted " + path);
- } catch (NoSuchElementException e) {
- // ignore
- } catch (Exception e) {
- log.warn("Error removing old marker " + path, e);
- }
- }
-
- class AutoScalingWatcher implements Watcher {
- @Override
- public void process(WatchedEvent watchedEvent) {
- // session events are not change events, and do not remove the watcher
- if (Event.EventType.None.equals(watchedEvent.getType())) {
- return;
- }
-
- try {
- refreshAutoScalingConf(this);
- } catch (ConnectException e) {
- log.warn("ZooKeeper watch triggered for autoscaling conf, but we cannot talk to ZK: [{}]", e.getMessage());
- } catch (InterruptedException e) {
- // Restore the interrupted status
- Thread.currentThread().interrupt();
- log.warn("Interrupted", e);
- } catch (Exception e) {
- log.error("Unexpected exception", e);
- }
- }
-
- }
-
- private void refreshAutoScalingConf(Watcher watcher) throws InterruptedException, IOException {
- updateLock.lock();
- try {
- if (isClosed) {
- return;
- }
- AutoScalingConfig currentConfig = cloudManager.getDistribStateManager().getAutoScalingConfig(watcher);
- log.debug("Refreshing {} with znode version {}", ZkStateReader.SOLR_AUTOSCALING_CONF_PATH, currentConfig.getZkVersion());
- if (znodeVersion >= currentConfig.getZkVersion()) {
- // protect against reordered watcher fires by ensuring that we only move forward
- return;
- }
- autoScalingConfig = currentConfig;
- znodeVersion = autoScalingConfig.getZkVersion();
- Map<String, AutoScaling.Trigger> triggerMap = loadTriggers(triggerFactory, autoScalingConfig);
-
- // remove all active triggers that have been removed from ZK
- Set<String> trackingKeySet = activeTriggers.keySet();
- trackingKeySet.retainAll(triggerMap.keySet());
-
- // now lets add or remove triggers which have been enabled or disabled respectively
- for (Map.Entry<String, AutoScaling.Trigger> entry : triggerMap.entrySet()) {
- String triggerName = entry.getKey();
- AutoScaling.Trigger trigger = entry.getValue();
- if (trigger.isEnabled()) {
- activeTriggers.put(triggerName, trigger);
- } else {
- activeTriggers.remove(triggerName);
- }
- }
- updated.signalAll();
- } finally {
- updateLock.unlock();
- }
- }
-
- private AutoScalingConfig withAutoAddReplicasTrigger(AutoScalingConfig autoScalingConfig) {
- Map<String, Object> triggerProps = AutoScaling.AUTO_ADD_REPLICAS_TRIGGER_PROPS;
- return withDefaultTrigger(triggerProps, autoScalingConfig);
- }
-
- private AutoScalingConfig withScheduledMaintenanceTrigger(AutoScalingConfig autoScalingConfig) {
- Map<String, Object> triggerProps = AutoScaling.SCHEDULED_MAINTENANCE_TRIGGER_PROPS;
- return withDefaultTrigger(triggerProps, autoScalingConfig);
- }
-
- private AutoScalingConfig withDefaultTrigger(Map<String, Object> triggerProps, AutoScalingConfig autoScalingConfig) {
- String triggerName = (String) triggerProps.get("name");
- Map<String, AutoScalingConfig.TriggerConfig> configs = autoScalingConfig.getTriggerConfigs();
- for (AutoScalingConfig.TriggerConfig cfg : configs.values()) {
- if (triggerName.equals(cfg.name)) {
- // already has this trigger
- return autoScalingConfig;
- }
- }
- // need to add
- triggerProps.computeIfPresent("waitFor", (k, v) -> (long) (cloudConfig.getAutoReplicaFailoverWaitAfterExpiration() / 1000));
- AutoScalingConfig.TriggerConfig config = new AutoScalingConfig.TriggerConfig(triggerName, triggerProps);
- autoScalingConfig = autoScalingConfig.withTriggerConfig(config);
- // need to add SystemLogListener explicitly here
- autoScalingConfig = AutoScalingHandler.withSystemLogListener(autoScalingConfig, triggerName);
- return autoScalingConfig;
- }
-
- private static Map<String, AutoScaling.Trigger> loadTriggers(AutoScaling.TriggerFactory triggerFactory, AutoScalingConfig autoScalingConfig) {
- Map<String, AutoScalingConfig.TriggerConfig> triggers = autoScalingConfig.getTriggerConfigs();
- if (triggers == null) {
- return Collections.emptyMap();
- }
-
- Map<String, AutoScaling.Trigger> triggerMap = new HashMap<>(triggers.size());
-
- for (Map.Entry<String, AutoScalingConfig.TriggerConfig> entry : triggers.entrySet()) {
- AutoScalingConfig.TriggerConfig cfg = entry.getValue();
- TriggerEventType eventType = cfg.event;
- String triggerName = entry.getKey();
- try {
- triggerMap.put(triggerName, triggerFactory.create(eventType, triggerName, cfg.properties));
- } catch (TriggerValidationException e) {
- log.warn("Error in trigger '" + triggerName + "' configuration, trigger config ignored: " + cfg, e);
- }
- }
- return triggerMap;
- }
-}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0ae21ad0/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTrigger.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTrigger.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTrigger.java
deleted file mode 100644
index 5e25542..0000000
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTrigger.java
+++ /dev/null
@@ -1,222 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.cloud.autoscaling;
-
-import java.lang.invoke.MethodHandles;
-import java.text.ParseException;
-import java.time.Instant;
-import java.time.format.DateTimeFormatter;
-import java.time.format.DateTimeFormatterBuilder;
-import java.time.temporal.ChronoField;
-import java.util.Collections;
-import java.util.Date;
-import java.util.Locale;
-import java.util.Map;
-import java.util.TimeZone;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.solr.client.solrj.cloud.SolrCloudManager;
-import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
-import org.apache.solr.common.SolrException;
-import org.apache.solr.common.params.AutoScalingParams;
-import org.apache.solr.common.util.TimeSource;
-import org.apache.solr.core.SolrResourceLoader;
-import org.apache.solr.util.DateMathParser;
-import org.apache.solr.util.TimeZoneUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.solr.common.params.AutoScalingParams.PREFERRED_OP;
-
-/**
- * A trigger which creates {@link TriggerEventType#SCHEDULED} events as per the configured schedule
- */
-public class ScheduledTrigger extends TriggerBase {
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
- private static final String DEFAULT_GRACE_DURATION = "+15MINUTES";
- private static final String LAST_RUN_AT = "lastRunAt";
- static final String ACTUAL_EVENT_TIME = "actualEventTime";
-
- private String everyStr;
-
- private String graceDurationStr;
-
- private String preferredOp;
-
- private TimeZone timeZone;
-
- private Instant lastRunAt;
-
- public ScheduledTrigger(String name) {
- super(TriggerEventType.SCHEDULED, name);
- TriggerUtils.requiredProperties(requiredProperties, validProperties, "startTime");
- TriggerUtils.validProperties(validProperties, "timeZone", "every", "graceDuration", AutoScalingParams.PREFERRED_OP);
- }
-
- @Override
- public void configure(SolrResourceLoader loader, SolrCloudManager cloudManager, Map<String, Object> properties) throws TriggerValidationException {
- super.configure(loader, cloudManager, properties);
- String timeZoneStr = (String) properties.get("timeZone");
- this.timeZone = TimeZoneUtils.parseTimezone(timeZoneStr); // defaults to UTC
-
- String startTimeStr = (String) properties.get("startTime");
- this.everyStr = (String) properties.get("every");
- this.graceDurationStr = (String) properties.getOrDefault("graceDuration", DEFAULT_GRACE_DURATION);
-
- preferredOp = (String) properties.get(PREFERRED_OP);
-
- // attempt parsing to validate date math strings
- // explicitly set NOW because it may be different for simulated time
- Date now = new Date(TimeUnit.NANOSECONDS.toMillis(cloudManager.getTimeSource().getEpochTimeNs()));
- Instant startTime = parseStartTime(now, startTimeStr, timeZoneStr);
- DateMathParser.parseMath(now, startTime + everyStr, timeZone);
- DateMathParser.parseMath(now, startTime + graceDurationStr, timeZone);
-
- // We set lastRunAt to be the startTime (which could be a date math expression such as 'NOW')
- // Ordinarily, NOW will always be evaluated in this constructor so it may seem that
- // the trigger will always fire the first time.
- // However, the lastRunAt is overwritten with the value from ZK
- // during restoreState() operation (which is performed before run()) so the trigger works correctly
- this.lastRunAt = startTime;
- }
-
- private Instant parseStartTime(Date now, String startTimeStr, String timeZoneStr) throws TriggerValidationException {
- try {
- // try parsing startTime as an ISO-8601 date time string
- return DateMathParser.parseMath(now, startTimeStr).toInstant();
- } catch (SolrException e) {
- if (e.code() != SolrException.ErrorCode.BAD_REQUEST.code) {
- throw new TriggerValidationException("startTime", "error parsing value '" + startTimeStr + "': " + e.toString());
- }
- }
- if (timeZoneStr == null) {
- throw new TriggerValidationException("timeZone",
- "Either 'startTime' should be an ISO-8601 date time string or 'timeZone' must be not be null");
- }
- TimeZone timeZone = TimeZone.getTimeZone(timeZoneStr);
- DateTimeFormatter dateTimeFormatter = new DateTimeFormatterBuilder()
- .append(DateTimeFormatter.ISO_LOCAL_DATE).appendPattern("['T'[HH[:mm[:ss]]]]")
- .parseDefaulting(ChronoField.HOUR_OF_DAY, 0)
- .parseDefaulting(ChronoField.MINUTE_OF_HOUR, 0)
- .parseDefaulting(ChronoField.SECOND_OF_MINUTE, 0)
- .toFormatter(Locale.ROOT).withZone(timeZone.toZoneId());
- try {
- return Instant.from(dateTimeFormatter.parse(startTimeStr));
- } catch (Exception e) {
- throw new TriggerValidationException("startTime", "error parsing startTime '" + startTimeStr + "': " + e.toString());
- }
- }
-
- @Override
- protected Map<String, Object> getState() {
- return Collections.singletonMap(LAST_RUN_AT, lastRunAt.toEpochMilli());
- }
-
- @Override
- protected void setState(Map<String, Object> state) {
- if (state.containsKey(LAST_RUN_AT)) {
- this.lastRunAt = Instant.ofEpochMilli((Long) state.get(LAST_RUN_AT));
- }
- }
-
- @Override
- public void restoreState(AutoScaling.Trigger old) {
- assert old.isClosed();
- if (old instanceof ScheduledTrigger) {
- ScheduledTrigger scheduledTrigger = (ScheduledTrigger) old;
- this.lastRunAt = scheduledTrigger.lastRunAt;
- } else {
- throw new SolrException(SolrException.ErrorCode.INVALID_STATE,
- "Unable to restore state from an unknown type of trigger");
- }
- }
-
- @Override
- public void run() {
- synchronized (this) {
- if (isClosed) {
- log.warn("ScheduledTrigger ran but was already closed");
- throw new RuntimeException("Trigger has been closed");
- }
- }
-
- TimeSource timeSource = cloudManager.getTimeSource();
- DateMathParser dateMathParser = new DateMathParser(timeZone);
- dateMathParser.setNow(new Date(lastRunAt.toEpochMilli()));
- Instant nextRunTime, nextPlusGrace;
- try {
- Date next = dateMathParser.parseMath(everyStr);
- dateMathParser.setNow(next);
- nextPlusGrace = dateMathParser.parseMath(graceDurationStr).toInstant();
- nextRunTime = next.toInstant();
- } catch (ParseException e) {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
- "Unable to calculate next run time. lastRan: " + lastRunAt.toString() + " and date math string: " + everyStr, e);
- }
-
- Instant now = Instant.ofEpochMilli(
- TimeUnit.NANOSECONDS.toMillis(timeSource.getEpochTimeNs()));
- AutoScaling.TriggerEventProcessor processor = processorRef.get();
-
- if (now.isBefore(nextRunTime)) {
- return; // it's not time yet
- }
- if (now.isAfter(nextPlusGrace)) {
- // we are past time and we could not run per schedule so skip this event
- if (log.isWarnEnabled()) {
- log.warn("ScheduledTrigger was not able to run event at scheduled time: {}. Now: {}",
- nextRunTime, now);
- }
- // Even though we are skipping the event, we need to notify any listeners of the IGNORED stage
- // so we create a dummy event with the ignored=true flag and ScheduledTriggers will do the rest
- if (processor != null && processor.process(new ScheduledEvent(getEventType(), getName(), timeSource.getTimeNs(),
- preferredOp, now.toEpochMilli(), true))) {
- lastRunAt = nextRunTime;
- return;
- }
- }
-
- if (processor != null) {
- if (log.isDebugEnabled()) {
- log.debug("ScheduledTrigger {} firing registered processor for scheduled time {}, now={}", name,
- nextRunTime, now);
- }
- if (processor.process(new ScheduledEvent(getEventType(), getName(), timeSource.getTimeNs(),
- preferredOp, now.toEpochMilli()))) {
- lastRunAt = nextRunTime; // set to nextRunTime instead of now to avoid drift
- }
- } else {
- lastRunAt = nextRunTime; // set to nextRunTime instead of now to avoid drift
- }
- }
-
- public static class ScheduledEvent extends TriggerEvent {
- public ScheduledEvent(TriggerEventType eventType, String source, long eventTime, String preferredOp, long actualEventTime) {
- this(eventType, source, eventTime, preferredOp, actualEventTime, false);
- }
-
- public ScheduledEvent(TriggerEventType eventType, String source, long eventTime, String preferredOp, long actualEventTime, boolean ignored) {
- super(eventType, source, eventTime, null, ignored);
- if (preferredOp != null) {
- properties.put(PREFERRED_OP, preferredOp);
- }
- properties.put(ACTUAL_EVENT_TIME, actualEventTime);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0ae21ad0/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
deleted file mode 100644
index 7c3cbb0..0000000
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
+++ /dev/null
@@ -1,802 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.cloud.autoscaling;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.stream.Collectors;
-
-import org.apache.commons.lang3.exception.ExceptionUtils;
-import org.apache.lucene.store.AlreadyClosedException;
-import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
-import org.apache.solr.client.solrj.cloud.DistribStateManager;
-import org.apache.solr.client.solrj.cloud.SolrCloudManager;
-import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventProcessorStage;
-import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData;
-import org.apache.solr.client.solrj.request.CollectionAdminRequest.RequestStatusResponse;
-import org.apache.solr.client.solrj.response.RequestStatusState;
-import org.apache.solr.cloud.Stats;
-import org.apache.solr.common.SolrException;
-import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.util.ExecutorUtil;
-import org.apache.solr.common.util.IOUtils;
-import org.apache.solr.common.util.Utils;
-import org.apache.solr.core.SolrResourceLoader;
-import org.apache.solr.util.DefaultSolrThreadFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.solr.cloud.autoscaling.ExecutePlanAction.waitForTaskToFinish;
-import static org.apache.solr.common.params.AutoScalingParams.ACTION_THROTTLE_PERIOD_SECONDS;
-import static org.apache.solr.common.params.AutoScalingParams.TRIGGER_COOLDOWN_PERIOD_SECONDS;
-import static org.apache.solr.common.params.AutoScalingParams.TRIGGER_CORE_POOL_SIZE;
-import static org.apache.solr.common.params.AutoScalingParams.TRIGGER_SCHEDULE_DELAY_SECONDS;
-import static org.apache.solr.common.util.ExecutorUtil.awaitTermination;
-
-/**
- * Responsible for scheduling active triggers, starting and stopping them and
- * performing actions when they fire
- */
-public class ScheduledTriggers implements Closeable {
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- public static final int DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS = 1;
- public static final int DEFAULT_ACTION_THROTTLE_PERIOD_SECONDS = 5;
- public static final int DEFAULT_COOLDOWN_PERIOD_SECONDS = 5;
- public static final int DEFAULT_TRIGGER_CORE_POOL_SIZE = 4;
-
- static final Map<String, Object> DEFAULT_PROPERTIES = new HashMap<>();
-
- static {
- DEFAULT_PROPERTIES.put(TRIGGER_SCHEDULE_DELAY_SECONDS, DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS);
- DEFAULT_PROPERTIES.put(TRIGGER_COOLDOWN_PERIOD_SECONDS, DEFAULT_COOLDOWN_PERIOD_SECONDS);
- DEFAULT_PROPERTIES.put(TRIGGER_CORE_POOL_SIZE, DEFAULT_TRIGGER_CORE_POOL_SIZE);
- DEFAULT_PROPERTIES.put(ACTION_THROTTLE_PERIOD_SECONDS, DEFAULT_ACTION_THROTTLE_PERIOD_SECONDS);
- }
-
- private final Map<String, TriggerWrapper> scheduledTriggerWrappers = new ConcurrentHashMap<>();
-
- /**
- * Thread pool for scheduling the triggers
- */
- private final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;
-
- /**
- * Single threaded executor to run the actions upon a trigger event. We rely on this being a single
- * threaded executor to ensure that trigger fires do not step on each other as well as to ensure
- * that we do not run scheduled trigger threads while an action has been submitted to this executor
- */
- private final ExecutorService actionExecutor;
-
- private boolean isClosed = false;
-
- private final AtomicBoolean hasPendingActions = new AtomicBoolean(false);
-
- private final AtomicLong cooldownStart = new AtomicLong();
-
- private final AtomicLong cooldownPeriod = new AtomicLong(TimeUnit.SECONDS.toNanos(DEFAULT_COOLDOWN_PERIOD_SECONDS));
-
- private final AtomicLong triggerDelay = new AtomicLong(DEFAULT_SCHEDULED_TRIGGER_DELAY_SECONDS);
-
- private final SolrCloudManager cloudManager;
-
- private final DistribStateManager stateManager;
-
- private final SolrResourceLoader loader;
-
- private final Stats queueStats;
-
- private final TriggerListeners listeners;
-
- private AutoScalingConfig autoScalingConfig;
-
- public ScheduledTriggers(SolrResourceLoader loader, SolrCloudManager cloudManager) {
- scheduledThreadPoolExecutor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(DEFAULT_TRIGGER_CORE_POOL_SIZE,
- new DefaultSolrThreadFactory("ScheduledTrigger"));
- scheduledThreadPoolExecutor.setRemoveOnCancelPolicy(true);
- scheduledThreadPoolExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
- actionExecutor = ExecutorUtil.newMDCAwareSingleThreadExecutor(new DefaultSolrThreadFactory("AutoscalingActionExecutor"));
- this.cloudManager = cloudManager;
- this.stateManager = cloudManager.getDistribStateManager();
- this.loader = loader;
- queueStats = new Stats();
- listeners = new TriggerListeners();
- // initialize cooldown timer
- cooldownStart.set(cloudManager.getTimeSource().getTimeNs() - cooldownPeriod.get());
- }
-
- /**
- * Set the current autoscaling config. This is invoked by {@link OverseerTriggerThread} when autoscaling.json is updated,
- * and it re-initializes trigger listeners and other properties used by the framework
- * @param autoScalingConfig current autoscaling.json
- */
- public void setAutoScalingConfig(AutoScalingConfig autoScalingConfig) {
- Map<String, Object> currentProps = new HashMap<>(DEFAULT_PROPERTIES);
- if (this.autoScalingConfig != null) {
- currentProps.putAll(this.autoScalingConfig.getProperties());
- }
-
- // reset listeners early in order to capture first execution of newly scheduled triggers
- listeners.setAutoScalingConfig(autoScalingConfig);
-
- for (Map.Entry<String, Object> entry : currentProps.entrySet()) {
- Map<String, Object> newProps = autoScalingConfig.getProperties();
- String key = entry.getKey();
- if (newProps.containsKey(key) && !entry.getValue().equals(newProps.get(key))) {
- log.debug("Changing value of autoscaling property: {} from: {} to: {}", key, entry.getValue(), newProps.get(key));
- switch (key) {
- case TRIGGER_SCHEDULE_DELAY_SECONDS:
- triggerDelay.set(((Number) newProps.get(key)).intValue());
- synchronized (this) {
- scheduledTriggerWrappers.forEach((s, triggerWrapper) -> {
- if (triggerWrapper.scheduledFuture.cancel(false)) {
- triggerWrapper.scheduledFuture = scheduledThreadPoolExecutor.scheduleWithFixedDelay(
- triggerWrapper, 0,
- cloudManager.getTimeSource().convertDelay(TimeUnit.SECONDS, triggerDelay.get(), TimeUnit.MILLISECONDS),
- TimeUnit.MILLISECONDS);
- } else {
- log.debug("Failed to cancel scheduled task: {}", s);
- }
- });
- }
- break;
- case TRIGGER_COOLDOWN_PERIOD_SECONDS:
- cooldownPeriod.set(TimeUnit.SECONDS.toNanos(((Number) newProps.get(key)).longValue()));
- break;
- case TRIGGER_CORE_POOL_SIZE:
- this.scheduledThreadPoolExecutor.setCorePoolSize(((Number) newProps.get(key)).intValue());
- break;
- }
- }
- }
-
- this.autoScalingConfig = autoScalingConfig;
- // reset cooldown
- cooldownStart.set(cloudManager.getTimeSource().getTimeNs() - cooldownPeriod.get());
- }
-
- /**
- * Adds a new trigger or replaces an existing one. The replaced trigger, if any, is closed
- * <b>before</b> the new trigger is run. If a trigger is replaced with itself then this
- * operation becomes a no-op.
- *
- * @param newTrigger the trigger to be managed
- * @throws AlreadyClosedException if this class has already been closed
- */
- public synchronized void add(AutoScaling.Trigger newTrigger) throws Exception {
- if (isClosed) {
- throw new AlreadyClosedException("ScheduledTriggers has been closed and cannot be used anymore");
- }
- TriggerWrapper st;
- try {
- st = new TriggerWrapper(newTrigger, cloudManager, queueStats);
- } catch (Exception e) {
- if (isClosed) {
- throw new AlreadyClosedException("ScheduledTriggers has been closed and cannot be used anymore");
- }
- if (cloudManager.isClosed()) {
- log.error("Failed to add trigger " + newTrigger.getName() + " - closing or disconnected from data provider", e);
- } else {
- log.error("Failed to add trigger " + newTrigger.getName(), e);
- }
- return;
- }
- TriggerWrapper triggerWrapper = st;
-
- TriggerWrapper old = scheduledTriggerWrappers.putIfAbsent(newTrigger.getName(), triggerWrapper);
- if (old != null) {
- if (old.trigger.equals(newTrigger)) {
- // the trigger wasn't actually modified so we do nothing
- return;
- }
- IOUtils.closeQuietly(old);
- newTrigger.restoreState(old.trigger);
- triggerWrapper.setReplay(false);
- scheduledTriggerWrappers.replace(newTrigger.getName(), triggerWrapper);
- }
- newTrigger.setProcessor(event -> {
- TriggerListeners triggerListeners = listeners.copy();
- if (cloudManager.isClosed()) {
- String msg = String.format(Locale.ROOT, "Ignoring autoscaling event %s because Solr has been shutdown.", event.toString());
- log.warn(msg);
- triggerListeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.ABORTED, msg);
- return false;
- }
- TriggerWrapper scheduledSource = scheduledTriggerWrappers.get(event.getSource());
- if (scheduledSource == null) {
- String msg = String.format(Locale.ROOT, "Ignoring autoscaling event %s because the source trigger: %s doesn't exist.", event.toString(), event.getSource());
- triggerListeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.FAILED, msg);
- log.warn(msg);
- return false;
- }
- boolean replaying = event.getProperty(TriggerEvent.REPLAYING) != null ? (Boolean)event.getProperty(TriggerEvent.REPLAYING) : false;
- AutoScaling.Trigger source = scheduledSource.trigger;
- if (scheduledSource.isClosed || source.isClosed()) {
- String msg = String.format(Locale.ROOT, "Ignoring autoscaling event %s because the source trigger: %s has already been closed", event.toString(), source);
- triggerListeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.ABORTED, msg);
- log.warn(msg);
- // we do not want to lose this event just because the trigger was closed, perhaps a replacement will need it
- return false;
- }
- if (event.isIgnored()) {
- log.debug("-------- Ignoring event: " + event);
- event.getProperties().put(TriggerEvent.IGNORED, true);
- triggerListeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.IGNORED, "Event was ignored.");
- return true; // always return true for ignored events
- }
- // even though we pause all triggers during action execution there is a possibility that a trigger was already
- // running at the time and would have already created an event so we reject such events during cooldown period
- if (cooldownStart.get() + cooldownPeriod.get() > cloudManager.getTimeSource().getTimeNs()) {
- log.debug("-------- Cooldown period - rejecting event: " + event);
- event.getProperties().put(TriggerEvent.COOLDOWN, true);
- triggerListeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.IGNORED, "In cooldown period.");
- return false;
- } else {
- log.debug("++++++++ Cooldown inactive - processing event: " + event);
- }
- if (hasPendingActions.compareAndSet(false, true)) {
- // pause all triggers while we execute actions so triggers do not operate on a cluster in transition
- pauseTriggers();
-
- final boolean enqueued;
- if (replaying) {
- enqueued = false;
- } else {
- enqueued = triggerWrapper.enqueue(event);
- }
- // fire STARTED event listeners after enqueuing the event is successful
- triggerListeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.STARTED);
- List<TriggerAction> actions = source.getActions();
- if (actions != null) {
- if (actionExecutor.isShutdown()) {
- String msg = String.format(Locale.ROOT, "Ignoring autoscaling event %s from trigger %s because the executor has already been closed", event.toString(), source);
- triggerListeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.ABORTED, msg);
- log.warn(msg);
- // we do not want to lose this event just because the trigger was closed, perhaps a replacement will need it
- return false;
- }
- actionExecutor.submit(() -> {
- assert hasPendingActions.get();
- long eventProcessingStart = cloudManager.getTimeSource().getTimeNs();
- TriggerListeners triggerListeners1 = triggerListeners.copy();
- log.debug("-- processing actions for " + event);
- try {
- // in future, we could wait for pending tasks in a different thread and re-enqueue
- // this event so that we continue processing other events and not block this action executor
- waitForPendingTasks(newTrigger, actions);
-
- ActionContext actionContext = new ActionContext(cloudManager, newTrigger, new HashMap<>());
- for (TriggerAction action : actions) {
- List<String> beforeActions = (List<String>) actionContext.getProperties().computeIfAbsent(TriggerEventProcessorStage.BEFORE_ACTION.toString(), k -> new ArrayList<String>());
- beforeActions.add(action.getName());
- triggerListeners1.fireListeners(event.getSource(), event, TriggerEventProcessorStage.BEFORE_ACTION, action.getName(), actionContext);
- try {
- action.process(event, actionContext);
- } catch (Exception e) {
- triggerListeners1.fireListeners(event.getSource(), event, TriggerEventProcessorStage.FAILED, action.getName(), actionContext, e, null);
- throw new TriggerActionException(event.getSource(), action.getName(), "Error processing action for trigger event: " + event, e);
- }
- List<String> afterActions = (List<String>) actionContext.getProperties().computeIfAbsent(TriggerEventProcessorStage.AFTER_ACTION.toString(), k -> new ArrayList<String>());
- afterActions.add(action.getName());
- triggerListeners1.fireListeners(event.getSource(), event, TriggerEventProcessorStage.AFTER_ACTION, action.getName(), actionContext);
- }
- if (enqueued) {
- TriggerEvent ev = triggerWrapper.dequeue();
- assert ev.getId().equals(event.getId());
- }
- triggerListeners1.fireListeners(event.getSource(), event, TriggerEventProcessorStage.SUCCEEDED);
- } catch (TriggerActionException e) {
- log.warn("Exception executing actions", e);
- } catch (Exception e) {
- triggerListeners1.fireListeners(event.getSource(), event, TriggerEventProcessorStage.FAILED);
- log.warn("Unhandled exception executing actions", e);
- } finally {
- cooldownStart.set(cloudManager.getTimeSource().getTimeNs());
- hasPendingActions.set(false);
- // resume triggers after cool down period
- resumeTriggers(cloudManager.getTimeSource().convertDelay(TimeUnit.NANOSECONDS, cooldownPeriod.get(), TimeUnit.MILLISECONDS));
- }
- log.debug("-- processing took {} ms for event id={}",
- TimeUnit.NANOSECONDS.toMillis(cloudManager.getTimeSource().getTimeNs() - eventProcessingStart), event.id);
- });
- } else {
- if (enqueued) {
- TriggerEvent ev = triggerWrapper.dequeue();
- if (!ev.getId().equals(event.getId())) {
- throw new RuntimeException("Wrong event dequeued, queue of " + triggerWrapper.trigger.getName()
- + " is broken! Expected event=" + event + " but got " + ev);
- }
- }
- triggerListeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.SUCCEEDED);
- hasPendingActions.set(false);
- // resume triggers now
- resumeTriggers(0);
- }
- return true;
- } else {
- // there is an action in the queue and we don't want to enqueue another until it is complete
- triggerListeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.IGNORED, "Already processing another event.");
- return false;
- }
- });
- newTrigger.init(); // mark as ready for scheduling
- triggerWrapper.scheduledFuture = scheduledThreadPoolExecutor.scheduleWithFixedDelay(triggerWrapper, 0,
- cloudManager.getTimeSource().convertDelay(TimeUnit.SECONDS, triggerDelay.get(), TimeUnit.MILLISECONDS),
- TimeUnit.MILLISECONDS);
- }
-
- /**
- * Pauses all scheduled trigger invocations without interrupting any that are in progress
- * @lucene.internal
- */
- public synchronized void pauseTriggers() {
- if (log.isDebugEnabled()) {
- log.debug("Pausing all triggers: {}", scheduledTriggerWrappers.keySet());
- }
- scheduledTriggerWrappers.forEach((s, triggerWrapper) -> triggerWrapper.scheduledFuture.cancel(false));
- }
-
- /**
- * Resumes all previously cancelled triggers to be scheduled after the given initial delay
- * @param afterDelayMillis the initial delay in milliseconds after which triggers should be resumed
- * @lucene.internal
- */
- public synchronized void resumeTriggers(long afterDelayMillis) {
- scheduledTriggerWrappers.forEach((s, triggerWrapper) -> {
- if (triggerWrapper.scheduledFuture.isCancelled()) {
- log.debug("Resuming trigger: {} after {}ms", s, afterDelayMillis);
- triggerWrapper.scheduledFuture = scheduledThreadPoolExecutor.scheduleWithFixedDelay(triggerWrapper, afterDelayMillis,
- cloudManager.getTimeSource().convertDelay(TimeUnit.SECONDS, triggerDelay.get(), TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
- }
- });
- }
-
- private void waitForPendingTasks(AutoScaling.Trigger newTrigger, List<TriggerAction> actions) throws AlreadyClosedException {
- DistribStateManager stateManager = cloudManager.getDistribStateManager();
- try {
-
- for (TriggerAction action : actions) {
- if (action instanceof ExecutePlanAction) {
- String parentPath = ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH + "/" + newTrigger.getName() + "/" + action.getName();
- if (!stateManager.hasData(parentPath)) {
- break;
- }
- List<String> children = stateManager.listData(parentPath);
- if (children != null) {
- for (String child : children) {
- String path = parentPath + '/' + child;
- VersionedData data = stateManager.getData(path, null);
- if (data != null) {
- Map map = (Map) Utils.fromJSON(data.getData());
- String requestid = (String) map.get("requestid");
- try {
- log.debug("Found pending task with requestid={}", requestid);
- RequestStatusResponse statusResponse = waitForTaskToFinish(cloudManager, requestid,
- ExecutePlanAction.DEFAULT_TASK_TIMEOUT_SECONDS, TimeUnit.SECONDS);
- if (statusResponse != null) {
- RequestStatusState state = statusResponse.getRequestStatus();
- if (state == RequestStatusState.COMPLETED || state == RequestStatusState.FAILED || state == RequestStatusState.NOT_FOUND) {
- stateManager.removeData(path, -1);
- }
- }
- } catch (Exception e) {
- if (cloudManager.isClosed()) {
- throw e; // propagate the abort to the caller
- }
- Throwable rootCause = ExceptionUtils.getRootCause(e);
- if (rootCause instanceof IllegalStateException && rootCause.getMessage().contains("Connection pool shut down")) {
- throw e;
- }
- if (rootCause instanceof TimeoutException && rootCause.getMessage().contains("Could not connect to ZooKeeper")) {
- throw e;
- }
- log.error("Unexpected exception while waiting for pending task with requestid: " + requestid + " to finish", e);
- }
- }
- }
- }
- }
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Thread interrupted", e);
- } catch (Exception e) {
- if (cloudManager.isClosed()) {
- throw new AlreadyClosedException("The Solr instance has been shutdown");
- }
- // we catch but don't rethrow because a failure to wait for pending tasks
- // should not keep the actions from executing
- log.error("Unexpected exception while waiting for pending tasks to finish", e);
- }
- }
-
- /**
- * Remove and stop all triggers. Also cleans up any leftover
- * state / events in ZK.
- */
- public synchronized void removeAll() {
- getScheduledTriggerNames().forEach(t -> {
- log.info("-- removing trigger: " + t);
- remove(t);
- });
- }
-
- /**
- * Removes and stops the trigger with the given name. Also cleans up any leftover
- * state / events in ZK.
- *
- * @param triggerName the name of the trigger to be removed
- */
- public synchronized void remove(String triggerName) {
- TriggerWrapper removed = scheduledTriggerWrappers.remove(triggerName);
- IOUtils.closeQuietly(removed);
- removeTriggerZKData(triggerName);
- }
-
- private void removeTriggerZKData(String triggerName) {
- String statePath = ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH + "/" + triggerName;
- String eventsPath = ZkStateReader.SOLR_AUTOSCALING_EVENTS_PATH + "/" + triggerName;
- try {
- stateManager.removeRecursively(statePath, true, true);
- } catch (Exception e) {
- log.warn("Failed to remove state for removed trigger " + statePath, e);
- }
- try {
- stateManager.removeRecursively(eventsPath, true, true);
- } catch (Exception e) {
- log.warn("Failed to remove events for removed trigger " + eventsPath, e);
- }
- }
-
- /**
- * @return an unmodifiable set of names of all triggers being managed by this class
- */
- public synchronized Set<String> getScheduledTriggerNames() {
- return Collections.unmodifiableSet(new HashSet<>(scheduledTriggerWrappers.keySet())); // shallow copy
- }
-
- @Override
- public void close() throws IOException {
- synchronized (this) {
- // mark that we are closed
- isClosed = true;
- for (TriggerWrapper triggerWrapper : scheduledTriggerWrappers.values()) {
- IOUtils.closeQuietly(triggerWrapper);
- }
- scheduledTriggerWrappers.clear();
- }
- // shutdown and interrupt all running tasks because there's no longer any
- // guarantee about cluster state
- log.debug("Shutting down scheduled thread pool executor now");
- scheduledThreadPoolExecutor.shutdownNow();
-
- log.debug("Shutting down action executor now");
- actionExecutor.shutdownNow();
-
- listeners.close();
-
- log.debug("Awaiting termination for action executor");
- awaitTermination(actionExecutor);
-
- log.debug("Awaiting termination for scheduled thread pool executor");
- awaitTermination(scheduledThreadPoolExecutor);
-
- log.debug("ScheduledTriggers closed completely");
- }
-
- private class TriggerWrapper implements Runnable, Closeable {
- AutoScaling.Trigger trigger;
- ScheduledFuture<?> scheduledFuture;
- TriggerEventQueue queue;
- boolean replay;
- volatile boolean isClosed;
-
- TriggerWrapper(AutoScaling.Trigger trigger, SolrCloudManager cloudManager, Stats stats) throws IOException {
- this.trigger = trigger;
- this.queue = new TriggerEventQueue(cloudManager, trigger.getName(), stats);
- this.replay = true;
- this.isClosed = false;
- }
-
- public void setReplay(boolean replay) {
- this.replay = replay;
- }
-
- public boolean enqueue(TriggerEvent event) {
- if (isClosed) {
- throw new AlreadyClosedException("ScheduledTrigger " + trigger.getName() + " has been closed.");
- }
- return queue.offerEvent(event);
- }
-
- public TriggerEvent dequeue() {
- if (isClosed) {
- throw new AlreadyClosedException("ScheduledTrigger " + trigger.getName() + " has been closed.");
- }
- TriggerEvent event = queue.pollEvent();
- return event;
- }
-
- @Override
- public void run() {
- if (isClosed) {
- throw new AlreadyClosedException("ScheduledTrigger " + trigger.getName() + " has been closed.");
- }
- // fire a trigger only if an action is not pending
- // note this is not fool proof e.g. it does not prevent an action being executed while a trigger
- // is still executing. There is additional protection against that scenario in the event listener.
- if (!hasPendingActions.get()) {
- // this synchronization is usually never under contention
- // but the only reason to have it here is to ensure that when the set-properties API is used
- // to change the schedule delay, we can safely cancel the old scheduled task
- // and create another one with the new delay without worrying about concurrent
- // execution of the same trigger instance
- synchronized (TriggerWrapper.this) {
- // replay accumulated events on first run, if any
- if (replay) {
- TriggerEvent event;
- // peek first without removing - we may crash before calling the listener
- while ((event = queue.peekEvent()) != null) {
- // override REPLAYING=true
- event.getProperties().put(TriggerEvent.REPLAYING, true);
- if (! trigger.getProcessor().process(event)) {
- log.error("Failed to re-play event, discarding: " + event);
- }
- queue.pollEvent(); // always remove it from queue
- }
- // now restore saved state to possibly generate new events from old state on the first run
- try {
- trigger.restoreState();
- } catch (Exception e) {
- // log but don't throw - see below
- log.error("Error restoring trigger state " + trigger.getName(), e);
- }
- replay = false;
- }
- try {
- trigger.run();
- } catch (Exception e) {
- // log but do not propagate exception because an exception thrown from a scheduled operation
- // will suppress future executions
- log.error("Unexpected exception from trigger: " + trigger.getName(), e);
- } finally {
- // checkpoint after each run
- trigger.saveState();
- }
- }
- }
- }
-
- @Override
- public void close() throws IOException {
- isClosed = true;
- if (scheduledFuture != null) {
- scheduledFuture.cancel(true);
- }
- IOUtils.closeQuietly(trigger);
- }
- }
-
- private class TriggerListeners {
- Map<String, Map<TriggerEventProcessorStage, List<TriggerListener>>> listenersPerStage = new HashMap<>();
- Map<String, TriggerListener> listenersPerName = new HashMap<>();
- ReentrantLock updateLock = new ReentrantLock();
-
- public TriggerListeners() {
-
- }
-
- private TriggerListeners(Map<String, Map<TriggerEventProcessorStage, List<TriggerListener>>> listenersPerStage,
- Map<String, TriggerListener> listenersPerName) {
- this.listenersPerStage = new HashMap<>();
- listenersPerStage.forEach((n, listeners) -> {
- Map<TriggerEventProcessorStage, List<TriggerListener>> perStage = this.listenersPerStage.computeIfAbsent(n, name -> new HashMap<>());
- listeners.forEach((s, lst) -> {
- List<TriggerListener> newLst = perStage.computeIfAbsent(s, stage -> new ArrayList<>());
- newLst.addAll(lst);
- });
- });
- this.listenersPerName = new HashMap<>(listenersPerName);
- }
-
- public TriggerListeners copy() {
- return new TriggerListeners(listenersPerStage, listenersPerName);
- }
-
- void setAutoScalingConfig(AutoScalingConfig autoScalingConfig) {
- updateLock.lock();
- // we will recreate this from scratch
- listenersPerStage.clear();
- try {
- Set<String> triggerNames = autoScalingConfig.getTriggerConfigs().keySet();
- Map<String, AutoScalingConfig.TriggerListenerConfig> configs = autoScalingConfig.getTriggerListenerConfigs();
- Set<String> listenerNames = configs.entrySet().stream().map(entry -> entry.getValue().name).collect(Collectors.toSet());
- // close those for non-existent triggers and nonexistent listener configs
- for (Iterator<Map.Entry<String, TriggerListener>> it = listenersPerName.entrySet().iterator(); it.hasNext(); ) {
- Map.Entry<String, TriggerListener> entry = it.next();
- String name = entry.getKey();
- TriggerListener listener = entry.getValue();
- if (!triggerNames.contains(listener.getConfig().trigger) || !listenerNames.contains(name)) {
- try {
- listener.close();
- } catch (Exception e) {
- log.warn("Exception closing old listener " + listener.getConfig(), e);
- }
- it.remove();
- }
- }
- for (Map.Entry<String, AutoScalingConfig.TriggerListenerConfig> entry : configs.entrySet()) {
- AutoScalingConfig.TriggerListenerConfig config = entry.getValue();
- if (!triggerNames.contains(config.trigger)) {
- log.debug("-- skipping listener for non-existent trigger: {}", config);
- continue;
- }
- // find previous instance and reuse if possible
- TriggerListener oldListener = listenersPerName.get(config.name);
- TriggerListener listener = null;
- if (oldListener != null) {
- if (!oldListener.getConfig().equals(config)) { // changed config
- try {
- oldListener.close();
- } catch (Exception e) {
- log.warn("Exception closing old listener " + oldListener.getConfig(), e);
- }
- } else {
- listener = oldListener; // reuse
- }
- }
- if (listener == null) { // create new instance
- String clazz = config.listenerClass;
- try {
- listener = loader.newInstance(clazz, TriggerListener.class);
- } catch (Exception e) {
- log.warn("Invalid TriggerListener class name '" + clazz + "', skipping...", e);
- }
- if (listener != null) {
- try {
- listener.configure(loader, cloudManager, config);
- listener.init();
- listenersPerName.put(config.name, listener);
- } catch (Exception e) {
- log.warn("Error initializing TriggerListener " + config, e);
- IOUtils.closeQuietly(listener);
- listener = null;
- }
- }
- }
- if (listener == null) {
- continue;
- }
- // add per stage
- for (TriggerEventProcessorStage stage : config.stages) {
- addPerStage(config.trigger, stage, listener);
- }
- // add also for beforeAction / afterAction TriggerStage
- if (!config.beforeActions.isEmpty()) {
- addPerStage(config.trigger, TriggerEventProcessorStage.BEFORE_ACTION, listener);
- }
- if (!config.afterActions.isEmpty()) {
- addPerStage(config.trigger, TriggerEventProcessorStage.AFTER_ACTION, listener);
- }
- }
- } finally {
- updateLock.unlock();
- }
- }
-
- private void addPerStage(String triggerName, TriggerEventProcessorStage stage, TriggerListener listener) {
- Map<TriggerEventProcessorStage, List<TriggerListener>> perStage =
- listenersPerStage.computeIfAbsent(triggerName, k -> new HashMap<>());
- List<TriggerListener> lst = perStage.computeIfAbsent(stage, k -> new ArrayList<>(3));
- lst.add(listener);
- }
-
- void reset() {
- updateLock.lock();
- try {
- listenersPerStage.clear();
- for (TriggerListener listener : listenersPerName.values()) {
- IOUtils.closeQuietly(listener);
- }
- listenersPerName.clear();
- } finally {
- updateLock.unlock();
- }
- }
-
- void close() {
- reset();
- }
-
- List<TriggerListener> getTriggerListeners(String trigger, TriggerEventProcessorStage stage) {
- Map<TriggerEventProcessorStage, List<TriggerListener>> perStage = listenersPerStage.get(trigger);
- if (perStage == null) {
- return Collections.emptyList();
- }
- List<TriggerListener> lst = perStage.get(stage);
- if (lst == null) {
- return Collections.emptyList();
- } else {
- return Collections.unmodifiableList(lst);
- }
- }
-
- void fireListeners(String trigger, TriggerEvent event, TriggerEventProcessorStage stage) {
- fireListeners(trigger, event, stage, null, null, null, null);
- }
-
- void fireListeners(String trigger, TriggerEvent event, TriggerEventProcessorStage stage, String message) {
- fireListeners(trigger, event, stage, null, null, null, message);
- }
-
- void fireListeners(String trigger, TriggerEvent event, TriggerEventProcessorStage stage, String actionName,
- ActionContext context) {
- fireListeners(trigger, event, stage, actionName, context, null, null);
- }
-
- void fireListeners(String trigger, TriggerEvent event, TriggerEventProcessorStage stage, String actionName,
- ActionContext context, Throwable error, String message) {
- updateLock.lock();
- try {
- for (TriggerListener listener : getTriggerListeners(trigger, stage)) {
- if (!listener.isEnabled()) {
- continue;
- }
- if (actionName != null) {
- AutoScalingConfig.TriggerListenerConfig config = listener.getConfig();
- if (stage == TriggerEventProcessorStage.BEFORE_ACTION) {
- if (!config.beforeActions.contains(actionName)) {
- continue;
- }
- } else if (stage == TriggerEventProcessorStage.AFTER_ACTION) {
- if (!config.afterActions.contains(actionName)) {
- continue;
- }
- }
- }
- try {
- listener.onEvent(event, stage, actionName, context, error, message);
- } catch (Exception e) {
- log.warn("Exception running listener " + listener.getConfig(), e);
- }
- }
- } finally {
- updateLock.unlock();
- }
- }
- }
-}