You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2017/07/12 11:49:19 UTC
[2/2] lucene-solr:jira/solr-11000: SOLR-11000 Make AutoScalingConfig
immutable. Refactor AutoScalingHandler to use AutoScalingConfigi. Move some
bean classes to SolrJ, clarify names of enums and constants, add javadoc.
SOLR-11000 Make AutoScalingConfig immutable. Refactor AutoScalingHandler to use AutoScalingConfigi.
Move some bean classes to SolrJ, clarify names of enums and constants, add javadoc.
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/1e80ceea
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/1e80ceea
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/1e80ceea
Branch: refs/heads/jira/solr-11000
Commit: 1e80ceea35db6a49a12938bf2f57a1f607f7a442
Parents: 9c8e829
Author: Andrzej Bialecki <ab...@apache.org>
Authored: Wed Jul 12 13:48:24 2017 +0200
Committer: Andrzej Bialecki <ab...@apache.org>
Committed: Wed Jul 12 13:48:24 2017 +0200
----------------------------------------------------------------------
.../src/java/org/apache/solr/cloud/Assign.java | 5 +-
.../org/apache/solr/cloud/ZkController.java | 27 +-
.../solr/cloud/autoscaling/AutoScaling.java | 35 +-
.../cloud/autoscaling/AutoScalingConfig.java | 227 ---------
.../cloud/autoscaling/AutoScalingHandler.java | 395 ++++++--------
.../cloud/autoscaling/ComputePlanAction.java | 1 +
.../cloud/autoscaling/HttpTriggerListener.java | 4 +-
.../cloud/autoscaling/LogTriggerListener.java | 7 +-
.../cloud/autoscaling/NodeAddedTrigger.java | 19 +-
.../solr/cloud/autoscaling/NodeLostTrigger.java | 19 +-
.../autoscaling/OverseerTriggerThread.java | 12 +-
.../cloud/autoscaling/ScheduledTriggers.java | 46 +-
.../solr/cloud/autoscaling/TriggerEvent.java | 9 +-
.../cloud/autoscaling/TriggerEventQueue.java | 3 +-
.../solr/cloud/autoscaling/TriggerListener.java | 6 +-
.../cloud/autoscaling/TriggerListenerBase.java | 1 +
.../AutoAddReplicasPlanActionTest.java | 4 +-
.../autoscaling/AutoScalingHandlerTest.java | 6 +-
.../autoscaling/ComputePlanActionTest.java | 3 +-
.../autoscaling/ExecutePlanActionTest.java | 3 +-
.../cloud/autoscaling/NodeAddedTriggerTest.java | 2 +-
.../cloud/autoscaling/NodeLostTriggerTest.java | 2 +-
.../autoscaling/TriggerIntegrationTest.java | 45 +-
.../cloud/autoscaling/AutoScalingConfig.java | 509 +++++++++++++++++++
.../client/solrj/cloud/autoscaling/Clause.java | 20 +-
.../client/solrj/cloud/autoscaling/Policy.java | 121 +++--
.../solrj/cloud/autoscaling/Preference.java | 19 +-
.../autoscaling/TriggerEventProcessorStage.java | 30 ++
.../cloud/autoscaling/TriggerEventType.java | 31 ++
.../java/org/apache/solr/common/MapWriter.java | 21 +
.../solr/common/params/AutoScalingParams.java | 1 +
.../solrj/cloud/autoscaling/TestPolicy.java | 1 -
32 files changed, 1011 insertions(+), 623 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1e80ceea/solr/core/src/java/org/apache/solr/cloud/Assign.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/Assign.java b/solr/core/src/java/org/apache/solr/cloud/Assign.java
index f83dbb7..e5d80a2 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Assign.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Assign.java
@@ -39,6 +39,7 @@ import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.SolrClientDataProvider;
import org.apache.solr.client.solrj.impl.ZkClientClusterStateProvider;
+import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
import org.apache.solr.cloud.rule.ReplicaAssigner;
import org.apache.solr.cloud.rule.Rule;
import org.apache.solr.common.SolrException;
@@ -297,8 +298,8 @@ public class Assign {
replicaPositions = getNodesViaRules(clusterState, shard, nrtReplicas, cc, coll, createNodeList, l);
}
String policyName = coll.getStr(POLICY);
- Map autoScalingJson = Utils.getJson(cc.getZkController().getZkClient(), SOLR_AUTOSCALING_CONF_PATH, true);
- if (policyName != null || autoScalingJson.get(Policy.CLUSTER_POLICY) != null) {
+ AutoScalingConfig autoScalingConfig = cc.getZkController().getAutoScalingConfig();
+ if (policyName != null || !autoScalingConfig.getPolicy().getClusterPolicy().isEmpty()) {
replicaPositions = Assign.getPositionsUsingPolicy(collectionName, Collections.singletonList(shard), nrtReplicas, 0, 0,
policyName, cc.getZkController().getZkStateReader(), createNodeList);
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1e80ceea/solr/core/src/java/org/apache/solr/cloud/ZkController.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index 6b64b83..e6fdb3f 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -53,8 +53,8 @@ import org.apache.commons.lang.StringUtils;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient.Builder;
import org.apache.solr.client.solrj.request.CoreAdminRequest.WaitForState;
-import org.apache.solr.cloud.autoscaling.AutoScaling;
-import org.apache.solr.cloud.autoscaling.AutoScalingConfig;
+import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
+import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
import org.apache.solr.cloud.overseer.OverseerAction;
import org.apache.solr.cloud.overseer.SliceMutator;
import org.apache.solr.common.SolrException;
@@ -72,6 +72,7 @@ import org.apache.solr.common.cloud.ZkMaintenanceUtils;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.cloud.ZooKeeperException;
+import org.apache.solr.common.params.AutoScalingParams;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.SolrParams;
@@ -578,13 +579,25 @@ public class ZkController {
}
/**
- *
+ * Get current {@link AutoScalingConfig}.
* @return current configuration from <code>autoscaling.json</code>. NOTE:
* this data is retrieved from ZK on each call.
*/
public AutoScalingConfig getAutoScalingConfig() throws KeeperException, InterruptedException {
- Map<String, Object> jsonMap = Utils.getJson(zkClient, ZkStateReader.SOLR_AUTOSCALING_CONF_PATH, true);
- return new AutoScalingConfig(jsonMap);
+ Stat stat = new Stat();
+ stat.setVersion(-1);
+
+ Map<String, Object> map = new HashMap<>();
+ try {
+ byte[] bytes = zkClient.getData(ZkStateReader.SOLR_AUTOSCALING_CONF_PATH, null, stat, true);
+ if (bytes != null && bytes.length > 0) {
+ map = (Map<String, Object>) Utils.fromJSON(bytes);
+ }
+ } catch (KeeperException.NoNodeException e) {
+ // ignore
+ }
+ map.put(AutoScalingParams.ZK_VERSION, stat.getVersion());
+ return new AutoScalingConfig(map);
}
/**
@@ -839,7 +852,7 @@ public class ZkController {
// then don't create markers
boolean createNodes = false;
try {
- createNodes = getAutoScalingConfig().hasTriggerForEvents(AutoScaling.EventType.NODELOST);
+ createNodes = getAutoScalingConfig().hasTriggerForEvents(TriggerEventType.NODELOST);
} catch (KeeperException | InterruptedException e1) {
log.warn("Unable to read autoscaling.json", e1);
}
@@ -938,7 +951,7 @@ public class ZkController {
List<Op> ops = new ArrayList<>(2);
ops.add(Op.create(nodePath, null, zkClient.getZkACLProvider().getACLsToAdd(nodePath), CreateMode.EPHEMERAL));
// if there are nodeAdded triggers don't create nodeAdded markers
- boolean createMarkerNode = getAutoScalingConfig().hasTriggerForEvents(AutoScaling.EventType.NODEADDED);
+ boolean createMarkerNode = getAutoScalingConfig().hasTriggerForEvents(TriggerEventType.NODEADDED);
if (createMarkerNode && !zkClient.exists(nodeAddedPath, true)) {
// use EPHEMERAL so that it disappears if this node goes down
// and no other action is taken
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1e80ceea/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java
index fa7311c..c25b00e 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScaling.java
@@ -24,34 +24,15 @@ import java.util.Map;
import com.google.common.base.Preconditions;
import org.apache.lucene.store.AlreadyClosedException;
+import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
import org.apache.solr.core.CoreContainer;
public class AutoScaling {
- public enum EventType {
- NODEADDED,
- NODELOST,
- REPLICALOST,
- MANUAL,
- SCHEDULED,
- SEARCHRATE,
- INDEXRATE
- }
-
- public enum EventProcessorStage {
- WAITING,
- STARTED,
- ABORTED,
- SUCCEEDED,
- FAILED,
- BEFORE_ACTION,
- AFTER_ACTION
- }
-
/**
* Implementation of this interface is used for processing events generated by a trigger.
*/
- public interface EventProcessor {
+ public interface TriggerEventProcessor {
/**
* This method is executed for events produced by {@link Trigger#run()}.
@@ -66,7 +47,7 @@ public class AutoScaling {
/**
* Interface for a Solr trigger. Each trigger implements Runnable and Closeable interface. A trigger
* is scheduled using a {@link java.util.concurrent.ScheduledExecutorService} so it is executed as
- * per a configured schedule to check whether the trigger is ready to fire. The {@link Trigger#setProcessor(EventProcessor)}
+ * per a configured schedule to check whether the trigger is ready to fire. The {@link Trigger#setProcessor(TriggerEventProcessor)}
* method should be used to set a processor which is used by implementation of this class whenever
* ready.
* <p>
@@ -76,7 +57,7 @@ public class AutoScaling {
* which can be get/set by a different thread than the one executing the trigger. Therefore, implementations
* should use appropriate synchronization around the listener.
* <p>
- * When a trigger is ready to fire, it calls the {@link EventProcessor#process(TriggerEvent)} event
+ * When a trigger is ready to fire, it calls the {@link TriggerEventProcessor#process(TriggerEvent)} event
* with the proper trigger event object. If that method returns false then it should be interpreted to mean
* that Solr is not ready to process this trigger event and therefore we should retain the state and fire
* at the next invocation of the run() method.
@@ -90,7 +71,7 @@ public class AutoScaling {
/**
* Event type generated by this trigger.
*/
- EventType getEventType();
+ TriggerEventType getEventType();
/** Returns true if this trigger is enabled. */
boolean isEnabled();
@@ -105,10 +86,10 @@ public class AutoScaling {
List<TriggerAction> getActions();
/** Set event processor to call when event is fired. */
- void setProcessor(EventProcessor processor);
+ void setProcessor(TriggerEventProcessor processor);
/** Get event processor. */
- EventProcessor getProcessor();
+ TriggerEventProcessor getProcessor();
/** Return true when this trigger is closed and cannot be used. */
boolean isClosed();
@@ -140,7 +121,7 @@ public class AutoScaling {
this.coreContainer = coreContainer;
}
- public synchronized Trigger create(EventType type, String name, Map<String, Object> props) {
+ public synchronized Trigger create(TriggerEventType type, String name, Map<String, Object> props) {
if (isClosed) {
throw new AlreadyClosedException("TriggerFactory has already been closed, cannot create new triggers");
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1e80ceea/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingConfig.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingConfig.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingConfig.java
deleted file mode 100644
index 54e9170..0000000
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingConfig.java
+++ /dev/null
@@ -1,227 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.cloud.autoscaling;
-
-import java.lang.invoke.MethodHandles;
-import java.util.Collections;
-import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
-import org.apache.solr.common.params.AutoScalingParams;
-import org.apache.solr.common.util.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Simple bean representation of <code>autoscaling.json</code>, which parses data
- * lazily.
- */
-public class AutoScalingConfig {
- private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
- private final Map<String, Object> jsonMap;
-
- private Policy policy;
- private Map<String, TriggerConfig> triggers;
- private Map<String, TriggerListenerConfig> listeners;
-
- /**
- * Bean representation of {@link TriggerListener} config.
- */
- public static class TriggerListenerConfig {
- public final String name;
- public final String trigger;
- public final EnumSet<AutoScaling.EventProcessorStage> stages = EnumSet.noneOf(AutoScaling.EventProcessorStage.class);
- public final String listenerClass;
- public final Set<String> beforeActions;
- public final Set<String> afterActions;
- public final Map<String, Object> properties = new HashMap<>();
-
- public TriggerListenerConfig(String name, Map<String, Object> properties) {
- this.name = name;
- this.properties.putAll(properties);
- trigger = (String)properties.get(AutoScalingParams.TRIGGER);
- List<String> stageNames = getList(AutoScalingParams.STAGE, properties);
- for (String stageName : stageNames) {
- try {
- AutoScaling.EventProcessorStage stage = AutoScaling.EventProcessorStage.valueOf(stageName.toUpperCase(Locale.ROOT));
- stages.add(stage);
- } catch (Exception e) {
- LOG.warn("Invalid stage name '" + name + "' in listener config, skipping: " + properties);
- }
- }
- listenerClass = (String)properties.get(AutoScalingParams.CLASS);
- beforeActions = new HashSet<>(getList(AutoScalingParams.BEFORE_ACTION, properties));
- afterActions = new HashSet<>(getList(AutoScalingParams.AFTER_ACTION, properties));
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- TriggerListenerConfig that = (TriggerListenerConfig) o;
-
- if (name != null ? !name.equals(that.name) : that.name != null) return false;
- if (trigger != null ? !trigger.equals(that.trigger) : that.trigger != null) return false;
- if (!stages.equals(that.stages)) return false;
- if (listenerClass != null ? !listenerClass.equals(that.listenerClass) : that.listenerClass != null) return false;
- if (!beforeActions.equals(that.beforeActions)) return false;
- if (!afterActions.equals(that.afterActions)) return false;
- return properties.equals(that.properties);
- }
- }
-
- /**
- * Bean representation of {@link org.apache.solr.cloud.autoscaling.AutoScaling.Trigger} config.
- */
- public static class TriggerConfig {
- public final String name;
- public final AutoScaling.EventType eventType;
- public final Map<String, Object> properties = new HashMap<>();
-
- public TriggerConfig(String name, Map<String, Object> properties) {
- this.name = name;
- String event = (String) properties.get(AutoScalingParams.EVENT);
- this.eventType = AutoScaling.EventType.valueOf(event.toUpperCase(Locale.ROOT));
- this.properties.putAll(properties);
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- TriggerConfig that = (TriggerConfig) o;
-
- if (name != null ? !name.equals(that.name) : that.name != null) return false;
- if (eventType != that.eventType) return false;
- return properties.equals(that.properties);
- }
- }
-
- public AutoScalingConfig(byte[] utf8) {
- this(utf8 != null && utf8.length > 0 ? (Map<String, Object>)Utils.fromJSON(utf8) : Collections.emptyMap());
- }
-
- /**
- * Construct from a JSON map representation.
- * @param jsonMap JSON map representation of the config.
- */
- public AutoScalingConfig(Map<String, Object> jsonMap) {
- this.jsonMap = Utils.getDeepCopy(jsonMap, 10);
- }
-
- /**
- * Return the original JSON map representation that was used for building this config.
- */
- public Map<String, Object> getJsonMap() {
- return jsonMap;
- }
-
- /**
- * Get {@link Policy} configuration.
- */
- public Policy getPolicy() {
- if (policy == null) {
- policy = new Policy(jsonMap);
- }
- return policy;
- }
-
- /**
- * Get trigger configurations.
- */
- public Map<String, TriggerConfig> getTriggerConfigs() {
- if (triggers == null) {
- Map<String, Object> trigMap = (Map<String, Object>)jsonMap.get("triggers");
- if (trigMap == null) {
- triggers = Collections.emptyMap();
- } else {
- triggers = new HashMap<>(trigMap.size());
- for (Map.Entry<String, Object> entry : trigMap.entrySet()) {
- triggers.put(entry.getKey(), new TriggerConfig(entry.getKey(), (Map<String, Object>)entry.getValue()));
- }
- }
- }
- return triggers;
- }
-
- /**
- * Check whether triggers for specific event type exist.
- * @param types list of event types
- * @return true if there's at least one trigger matching at least one event type,
- * false otherwise,
- */
- public boolean hasTriggerForEvents(AutoScaling.EventType... types) {
- if (types == null || types.length == 0) {
- return false;
- }
- for (TriggerConfig config : getTriggerConfigs().values()) {
- for (AutoScaling.EventType type : types) {
- if (config.eventType.equals(type)) {
- return true;
- }
- }
- }
- return false;
- }
-
- /**
- * Get listener configurations.
- */
- public Map<String, TriggerListenerConfig> getTriggerListenerConfigs() {
- if (listeners == null) {
- Map<String, Object> map = (Map<String, Object>)jsonMap.get("listeners");
- if (map == null) {
- listeners = Collections.emptyMap();
- } else {
- listeners = new HashMap<>(map.size());
- for (Map.Entry<String, Object> entry : map.entrySet()) {
- listeners.put(entry.getKey(), new TriggerListenerConfig(entry.getKey(), (Map<String, Object>)entry.getValue()));
- }
- }
- }
- return listeners;
- }
-
- private static List<String> getList(String key, Map<String, Object> properties) {
- return getList(key, properties, null);
- }
-
- private static List<String> getList(String key, Map<String, Object> properties, List<String> defaultList) {
- if (defaultList == null) {
- defaultList = Collections.emptyList();
- }
- Object o = properties.get(key);
- if (o == null) {
- return defaultList;
- }
- if (o instanceof List) {
- return (List)o;
- } else {
- return Collections.singletonList(String.valueOf(o));
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1e80ceea/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingHandler.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingHandler.java
index e730088..17ed028 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/AutoScalingHandler.java
@@ -30,19 +30,23 @@ import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import com.google.common.collect.ImmutableSet;
import org.apache.solr.api.Api;
import org.apache.solr.api.ApiBag;
+import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
import org.apache.solr.client.solrj.cloud.autoscaling.Cell;
import org.apache.solr.client.solrj.cloud.autoscaling.Clause;
import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
import org.apache.solr.client.solrj.cloud.autoscaling.Preference;
import org.apache.solr.client.solrj.cloud.autoscaling.Row;
+import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventProcessorStage;
+import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.SolrClientDataProvider;
+import org.apache.solr.common.MapWriter;
import org.apache.solr.common.SolrException;
-import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.CommandOperation;
import org.apache.solr.common.util.StrUtils;
@@ -57,7 +61,6 @@ import org.apache.solr.security.AuthorizationContext;
import org.apache.solr.security.PermissionNameProvider;
import org.apache.solr.util.TimeSource;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -110,11 +113,18 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unknown path: " + path);
}
- Map<String, Object> map = zkReadAutoScalingConf(container.getZkController().getZkStateReader());
+ AutoScalingConfig autoScalingConf = container.getZkController().getAutoScalingConfig();
if (parts.size() == 2) {
- rsp.getValues().addAll(map);
+ autoScalingConf.writeMap(new MapWriter.EntryWriter() {
+
+ @Override
+ public MapWriter.EntryWriter put(String k, Object v) throws IOException {
+ rsp.getValues().add(k, v);
+ return this;
+ }
+ });
} else if (parts.size() == 3 && DIAGNOSTICS.equals(parts.get(2))) {
- handleDiagnostics(rsp, map);
+ handleDiagnostics(rsp, autoScalingConf);
}
} else {
if (req.getContentStreams() == null) {
@@ -125,42 +135,49 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
// errors have already been added to the response so there's nothing left to do
return;
}
+ AutoScalingConfig initialConfig = container.getZkController().getAutoScalingConfig();
+ AutoScalingConfig currentConfig = initialConfig;
for (CommandOperation op : ops) {
switch (op.name) {
case CMD_SET_TRIGGER:
- handleSetTrigger(req, rsp, op);
+ currentConfig = handleSetTrigger(req, rsp, op, currentConfig);
break;
case CMD_REMOVE_TRIGGER:
- handleRemoveTrigger(req, rsp, op);
+ currentConfig = handleRemoveTrigger(req, rsp, op, currentConfig);
break;
case CMD_SET_LISTENER:
- handleSetListener(req, rsp, op);
+ currentConfig = handleSetListener(req, rsp, op, currentConfig);
break;
case CMD_REMOVE_LISTENER:
- handleRemoveListener(req, rsp, op);
+ currentConfig = handleRemoveListener(req, rsp, op, currentConfig);
break;
case CMD_SUSPEND_TRIGGER:
- handleSuspendTrigger(req, rsp, op);
+ currentConfig = handleSuspendTrigger(req, rsp, op, currentConfig);
break;
case CMD_RESUME_TRIGGER:
- handleResumeTrigger(req, rsp, op);
+ currentConfig = handleResumeTrigger(req, rsp, op, currentConfig);
break;
case CMD_SET_POLICY:
- handleSetPolicies(req, rsp, op);
+ currentConfig = handleSetPolicies(req, rsp, op, currentConfig);
break;
case CMD_REMOVE_POLICY:
- handleRemovePolicy(req, rsp, op);
+ currentConfig = handleRemovePolicy(req, rsp, op, currentConfig);
break;
case CMD_SET_CLUSTER_PREFERENCES:
- handleSetClusterPreferences(req, rsp, op);
+ currentConfig = handleSetClusterPreferences(req, rsp, op, currentConfig);
break;
case CMD_SET_CLUSTER_POLICY:
- handleSetClusterPolicy(req, rsp, op);
+ currentConfig = handleSetClusterPolicy(req, rsp, op, currentConfig);
break;
default:
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unknown command: " + op.name);
}
}
+ if (!currentConfig.equals(initialConfig)) {
+ // update in ZK
+ zkSetAutoScalingConfig(container.getZkController().getZkStateReader(), currentConfig);
+ }
+ rsp.getValues().add("result", "success");
}
} catch (Exception e) {
rsp.getValues().add("result", "failure");
@@ -170,8 +187,8 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
}
}
- private void handleDiagnostics(SolrQueryResponse rsp, Map<String, Object> autoScalingConf) throws IOException {
- Policy policy = new Policy(autoScalingConf);
+ private void handleDiagnostics(SolrQueryResponse rsp, AutoScalingConfig autoScalingConf) throws IOException {
+ Policy policy = autoScalingConf.getPolicy();
try (CloudSolrClient build = new CloudSolrClient.Builder()
.withHttpClient(container.getUpdateShardHandler().getHttpClient())
.withZkHost(container.getZkController().getZkServerAddress()).build()) {
@@ -204,90 +221,103 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
}
}
- private void handleSetClusterPolicy(SolrQueryRequest req, SolrQueryResponse rsp, CommandOperation op) throws KeeperException, InterruptedException, IOException {
- List clusterPolicy = (List) op.getCommandData();
+ private AutoScalingConfig handleSetClusterPolicy(SolrQueryRequest req, SolrQueryResponse rsp, CommandOperation op,
+ AutoScalingConfig currentConfig) throws KeeperException, InterruptedException, IOException {
+ List<Map<String, Object>> clusterPolicy = (List<Map<String, Object>>) op.getCommandData();
if (clusterPolicy == null || !(clusterPolicy instanceof List)) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "A list of cluster policies was not found");
}
- zkSetClusterPolicy(container.getZkController().getZkStateReader(), clusterPolicy);
- rsp.getValues().add("result", "success");
+ List<Clause> cp = clusterPolicy.stream().map(Clause::new).collect(Collectors.toList());
+ Policy p = currentConfig.getPolicy().withClusterPolicy(cp);
+ currentConfig = currentConfig.withPolicy(p);
+ return currentConfig;
}
- private void handleSetClusterPreferences(SolrQueryRequest req, SolrQueryResponse rsp, CommandOperation op) throws KeeperException, InterruptedException, IOException {
- List preferences = (List) op.getCommandData();
+ private AutoScalingConfig handleSetClusterPreferences(SolrQueryRequest req, SolrQueryResponse rsp, CommandOperation op,
+ AutoScalingConfig currentConfig) throws KeeperException, InterruptedException, IOException {
+ List<Map<String, Object>> preferences = (List<Map<String, Object>>) op.getCommandData();
if (preferences == null || !(preferences instanceof List)) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "A list of cluster preferences not found");
}
- zkSetPreferences(container.getZkController().getZkStateReader(), preferences);
- rsp.getValues().add("result", "success");
+ List<Preference> prefs = preferences.stream().map(Preference::new).collect(Collectors.toList());
+ Policy p = currentConfig.getPolicy().withClusterPreferences(prefs);
+ currentConfig = currentConfig.withPolicy(p);
+ return currentConfig;
}
- private void handleRemovePolicy(SolrQueryRequest req, SolrQueryResponse rsp, CommandOperation op) throws KeeperException, InterruptedException, IOException {
+ private AutoScalingConfig handleRemovePolicy(SolrQueryRequest req, SolrQueryResponse rsp, CommandOperation op,
+ AutoScalingConfig currentConfig) throws KeeperException, InterruptedException, IOException {
String policyName = (String) op.getCommandData();
if (policyName.trim().length() == 0) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The policy name cannot be empty");
}
- Map<String, Object> autoScalingConf = zkReadAutoScalingConf(container.getZkController().getZkStateReader());
- Map<String, Object> policies = (Map<String, Object>) autoScalingConf.get("policies");
+ Map<String, List<Clause>> policies = currentConfig.getPolicy().getPolicies();
if (policies == null || !policies.containsKey(policyName)) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No policy exists with name: " + policyName);
}
-
- zkSetPolicies(container.getZkController().getZkStateReader(), policyName, null);
- rsp.getValues().add("result", "success");
+ policies = new HashMap<>(policies);
+ policies.remove(policyName);
+ Policy p = currentConfig.getPolicy().withPolicies(policies);
+ currentConfig = currentConfig.withPolicy(p);
+ return currentConfig;
}
- private void handleSetPolicies(SolrQueryRequest req, SolrQueryResponse rsp, CommandOperation op) throws KeeperException, InterruptedException, IOException {
- Map<String, Object> policies = op.getDataMap();
- for (Map.Entry<String, Object> policy : policies.entrySet()) {
+ private AutoScalingConfig handleSetPolicies(SolrQueryRequest req, SolrQueryResponse rsp, CommandOperation op,
+ AutoScalingConfig currentConfig) throws KeeperException, InterruptedException, IOException {
+ Map<String, Object> policiesMap = op.getDataMap();
+ for (Map.Entry<String, Object> policy : policiesMap.entrySet()) {
String policyName = policy.getKey();
if (policyName == null || policyName.trim().length() == 0) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The policy name cannot be null or empty");
}
}
-
- zkSetPolicies(container.getZkController().getZkStateReader(), null, policies);
-
- rsp.getValues().add("result", "success");
+ List<String> params = new ArrayList<>(currentConfig.getPolicy().getParams());
+ Map<String, List<Clause>> mergedPolicies = new HashMap<>(currentConfig.getPolicy().getPolicies());
+ Map<String, List<Clause>> newPolicies = Policy.policiesFromMap((Map<String, List<Map<String, Object>>>)op.getCommandData(),
+ params);
+ mergedPolicies.putAll(newPolicies);
+ Policy p = currentConfig.getPolicy().withPolicies(mergedPolicies).withParams(params);
+ currentConfig = currentConfig.withPolicy(p);
+ return currentConfig;
}
- private void handleResumeTrigger(SolrQueryRequest req, SolrQueryResponse rsp, CommandOperation op) throws KeeperException, InterruptedException {
+ private AutoScalingConfig handleResumeTrigger(SolrQueryRequest req, SolrQueryResponse rsp, CommandOperation op,
+ AutoScalingConfig currentConfig) throws KeeperException, InterruptedException {
String triggerName = op.getStr(NAME);
if (triggerName == null || triggerName.trim().length() == 0) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The trigger name cannot be null or empty");
}
- Map<String, Object> autoScalingConf = zkReadAutoScalingConf(container.getZkController().getZkStateReader());
- Map<String, Object> triggers = (Map<String, Object>) autoScalingConf.get("triggers");
+ Map<String, AutoScalingConfig.TriggerConfig> triggers = currentConfig.getTriggerConfigs();
Set<String> changed = new HashSet<>();
- if (triggers == null) {
- if (Policy.EACH.equals(triggerName)) {
- // no harm no foul
- } else {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No trigger exists with name: " + triggerName);
- }
- } else {
- if (!Policy.EACH.equals(triggerName) && !triggers.containsKey(triggerName)) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No trigger exists with name: " + triggerName);
- }
- for (Map.Entry<String, Object> entry : triggers.entrySet()) {
- if (Policy.EACH.equals(triggerName) || triggerName.equals(entry.getKey())) {
- Map<String, Object> triggerProps = (Map<String, Object>) entry.getValue();
- Boolean enabled = (Boolean)triggerProps.get(ENABLED);
- if (enabled != null && !enabled) {
- triggerProps.put(ENABLED, true);
- zkSetTrigger(container.getZkController().getZkStateReader(), entry.getKey(), triggerProps);
- changed.add(entry.getKey());
- }
+ if (!Policy.EACH.equals(triggerName) && !triggers.containsKey(triggerName)) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No trigger exists with name: " + triggerName);
+ }
+ Map<String, AutoScalingConfig.TriggerConfig> newTriggers = new HashMap<>();
+ for (Map.Entry<String, AutoScalingConfig.TriggerConfig> entry : triggers.entrySet()) {
+ if (Policy.EACH.equals(triggerName) || triggerName.equals(entry.getKey())) {
+ AutoScalingConfig.TriggerConfig trigger = entry.getValue();
+ if (!trigger.enabled) {
+ trigger = trigger.withEnabled(true);
+ newTriggers.put(entry.getKey(), trigger);
+ changed.add(entry.getKey());
+ } else {
+ newTriggers.put(entry.getKey(), entry.getValue());
}
+ } else {
+ newTriggers.put(entry.getKey(), entry.getValue());
}
}
rsp.getValues().add("changed", changed);
- rsp.getValues().add("result", "success");
+ if (!changed.isEmpty()) {
+ currentConfig = currentConfig.withTriggerConfigs(newTriggers);
+ }
+ return currentConfig;
}
- private void handleSuspendTrigger(SolrQueryRequest req, SolrQueryResponse rsp, CommandOperation op) throws KeeperException, InterruptedException {
+ private AutoScalingConfig handleSuspendTrigger(SolrQueryRequest req, SolrQueryResponse rsp, CommandOperation op,
+ AutoScalingConfig currentConfig) throws KeeperException, InterruptedException {
String triggerName = op.getStr(NAME);
if (triggerName == null || triggerName.trim().length() == 0) {
@@ -306,55 +336,54 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
}
}
- Map<String, Object> autoScalingConf = zkReadAutoScalingConf(container.getZkController().getZkStateReader());
- Map<String, Object> triggers = (Map<String, Object>) autoScalingConf.get("triggers");
+ Map<String, AutoScalingConfig.TriggerConfig> triggers = currentConfig.getTriggerConfigs();
Set<String> changed = new HashSet<>();
- if (triggers == null) {
- if (Policy.EACH.equals(triggerName)) {
- // no harm no foul
- } else {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No trigger exists with name: " + triggerName);
- }
- } else {
- if (!Policy.EACH.equals(triggerName) && !triggers.containsKey(triggerName)) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No trigger exists with name: " + triggerName);
- }
- for (Map.Entry<String, Object> entry : triggers.entrySet()) {
- if (Policy.EACH.equals(triggerName) || triggerName.equals(entry.getKey())) {
- Map<String, Object> triggerProps = (Map<String, Object>) entry.getValue();
- Boolean enabled = (Boolean)triggerProps.get(ENABLED);
- if (enabled == null || enabled) {
- triggerProps.put(ENABLED, false);
- if (resumeTime != null) {
- triggerProps.put(RESUME_AT, resumeTime.getTime());
- }
- zkSetTrigger(container.getZkController().getZkStateReader(), entry.getKey(), triggerProps);
- changed.add(entry.getKey());
+ if (!Policy.EACH.equals(triggerName) && !triggers.containsKey(triggerName)) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No trigger exists with name: " + triggerName);
+ }
+ Map<String, AutoScalingConfig.TriggerConfig> newTriggers = new HashMap<>();
+ for (Map.Entry<String, AutoScalingConfig.TriggerConfig> entry : triggers.entrySet()) {
+ if (Policy.EACH.equals(triggerName) || triggerName.equals(entry.getKey())) {
+ AutoScalingConfig.TriggerConfig trigger = entry.getValue();
+ if (trigger.enabled) {
+ trigger = trigger.withEnabled(false);
+ if (resumeTime != null) {
+ trigger = trigger.withProperty(RESUME_AT, resumeTime.getTime());
}
+ newTriggers.put(entry.getKey(), trigger);
+ changed.add(trigger.name);
+ } else {
+ newTriggers.put(entry.getKey(), entry.getValue());
}
+ } else {
+ newTriggers.put(entry.getKey(), entry.getValue());
}
}
rsp.getValues().add("changed", changed);
- rsp.getValues().add("result", "success");
+ if (!changed.isEmpty()) {
+ currentConfig = currentConfig.withTriggerConfigs(newTriggers);
+ }
+ return currentConfig;
}
- private void handleRemoveListener(SolrQueryRequest req, SolrQueryResponse rsp, CommandOperation op) throws KeeperException, InterruptedException {
+ private AutoScalingConfig handleRemoveListener(SolrQueryRequest req, SolrQueryResponse rsp, CommandOperation op,
+ AutoScalingConfig currentConfig) throws KeeperException, InterruptedException {
String listenerName = op.getStr(NAME);
if (listenerName == null || listenerName.trim().length() == 0) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The listener name cannot be null or empty");
}
- Map<String, Object> autoScalingConf = zkReadAutoScalingConf(container.getZkController().getZkStateReader());
- Map<String, Object> listeners = (Map<String, Object>) autoScalingConf.get("listeners");
+ Map<String, AutoScalingConfig.TriggerListenerConfig> listeners = currentConfig.getTriggerListenerConfigs();
if (listeners == null || !listeners.containsKey(listenerName)) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No listener exists with name: " + listenerName);
}
- zkSetListener(container.getZkController().getZkStateReader(), listenerName, null);
- rsp.getValues().add("result", "success");
+ currentConfig = currentConfig.withoutTriggerListenerConfig(listenerName);
+ return currentConfig;
}
- private void handleSetListener(SolrQueryRequest req, SolrQueryResponse rsp, CommandOperation op) throws KeeperException, InterruptedException {
+ private AutoScalingConfig handleSetListener(SolrQueryRequest req, SolrQueryResponse rsp, CommandOperation op,
+ AutoScalingConfig currentConfig) throws KeeperException, InterruptedException {
String listenerName = op.getStr(NAME);
String triggerName = op.getStr(TRIGGER);
List<String> stageNames = op.getStrs(STAGE, Collections.emptyList());
@@ -365,13 +394,13 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
if (listenerName == null || listenerName.trim().length() == 0) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The listener name cannot be null or empty");
}
+ Map<String, AutoScalingConfig.TriggerListenerConfig> listeners = currentConfig.getTriggerListenerConfigs();
- Map<String, Object> autoScalingConf = zkReadAutoScalingConf(container.getZkController().getZkStateReader());
- Map<String, Object> triggers = (Map<String, Object>) autoScalingConf.get("triggers");
+ Map<String, AutoScalingConfig.TriggerConfig> triggers = currentConfig.getTriggerConfigs();
if (triggers == null || !triggers.containsKey(triggerName)) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "A trigger with the name " + triggerName + " does not exist");
}
- Map<String, Object> triggerProps = (Map<String, Object>) triggers.get(triggerName);
+ AutoScalingConfig.TriggerConfig triggerConfig = triggers.get(triggerName);
if (stageNames.isEmpty() && beforeActions.isEmpty() && afterActions.isEmpty()) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Either 'stage' or 'beforeAction' or 'afterAction' must be specified");
@@ -379,7 +408,7 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
for (String stage : stageNames) {
try {
- AutoScaling.EventProcessorStage.valueOf(stage);
+ TriggerEventProcessorStage.valueOf(stage);
} catch (IllegalArgumentException e) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Invalid stage name: " + stage);
}
@@ -396,47 +425,23 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Listener not found: " + listenerClass, e);
}
- List<Map<String, String>> actions = (List<Map<String, String>>) triggerProps.get("actions");
Set<String> actionNames = new HashSet<>();
actionNames.addAll(beforeActions);
actionNames.addAll(afterActions);
- for (Map<String, String> action : actions) {
- actionNames.remove(action.get(NAME));
+ for (AutoScalingConfig.ActionConfig action : triggerConfig.actions) {
+ actionNames.remove(action.name);
}
if (!actionNames.isEmpty()) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The trigger '" + triggerName + "' does not have actions named: " + actionNames);
}
-
+ AutoScalingConfig.TriggerListenerConfig listener = new AutoScalingConfig.TriggerListenerConfig(listenerName, op.getValuesExcluding("name"));
// todo - handle races between competing set-trigger and set-listener invocations
- zkSetListener(container.getZkController().getZkStateReader(), listenerName, op.getValuesExcluding("name"));
- rsp.getValues().add("result", "success");
+ currentConfig = currentConfig.withTriggerListenerConfig(listener);
+ return currentConfig;
}
- private void zkSetListener(ZkStateReader reader, String listenerName, Map<String, Object> listenerProperties) throws KeeperException, InterruptedException {
- while (true) {
- Stat stat = new Stat();
- ZkNodeProps loaded = null;
- byte[] data = reader.getZkClient().getData(SOLR_AUTOSCALING_CONF_PATH, null, stat, true);
- loaded = ZkNodeProps.load(data);
- Map<String, Object> listeners = (Map<String, Object>) loaded.get("listeners");
- if (listeners == null) listeners = new HashMap<>(1);
- if (listenerProperties != null) {
- listeners.put(listenerName, listenerProperties);
- } else {
- listeners.remove(listenerName);
- }
- loaded = loaded.plus("listeners", listeners);
- try {
- reader.getZkClient().setData(SOLR_AUTOSCALING_CONF_PATH, Utils.toJSON(loaded), stat.getVersion(), true);
- } catch (KeeperException.BadVersionException bve) {
- // somebody else has changed the configuration so we must retry
- continue;
- }
- break;
- }
- }
-
- private void handleSetTrigger(SolrQueryRequest req, SolrQueryResponse rsp, CommandOperation op) throws KeeperException, InterruptedException {
+ private AutoScalingConfig handleSetTrigger(SolrQueryRequest req, SolrQueryResponse rsp, CommandOperation op,
+ AutoScalingConfig currentConfig) throws KeeperException, InterruptedException {
String triggerName = op.getStr(NAME);
if (triggerName == null || triggerName.trim().length() == 0) {
@@ -447,7 +452,7 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
if (eventTypeStr == null || eventTypeStr.trim().length() == 0) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The event type cannot be null or empty in trigger: " + triggerName);
}
- AutoScaling.EventType eventType = AutoScaling.EventType.valueOf(eventTypeStr.trim().toUpperCase(Locale.ROOT));
+ TriggerEventType eventType = TriggerEventType.valueOf(eventTypeStr.trim().toUpperCase(Locale.ROOT));
String waitForStr = op.getStr(WAIT_FOR, null);
if (waitForStr != null) {
@@ -482,9 +487,9 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Action not found: " + klass, e);
}
}
-
- zkSetTrigger(container.getZkController().getZkStateReader(), triggerName, op.getValuesExcluding("name"));
- rsp.getValues().add("result", "success");
+ AutoScalingConfig.TriggerConfig trigger = new AutoScalingConfig.TriggerConfig(triggerName, op.getValuesExcluding("name"));
+ currentConfig = currentConfig.withTriggerConfig(trigger);
+ return currentConfig;
}
private int parseHumanTime(String timeStr) {
@@ -507,119 +512,48 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
return seconds;
}
- private void handleRemoveTrigger(SolrQueryRequest req, SolrQueryResponse rsp, CommandOperation op) throws KeeperException, InterruptedException {
+ private AutoScalingConfig handleRemoveTrigger(SolrQueryRequest req, SolrQueryResponse rsp, CommandOperation op,
+ AutoScalingConfig currentConfig) throws KeeperException, InterruptedException {
String triggerName = op.getStr(NAME);
boolean removeListeners = op.getBoolean(REMOVE_LISTENERS, false);
if (triggerName == null || triggerName.trim().length() == 0) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The trigger name cannot be null or empty");
}
- Map<String, Object> autoScalingConf = zkReadAutoScalingConf(container.getZkController().getZkStateReader());
- Map<String, Object> triggers = (Map<String, Object>) autoScalingConf.get("triggers");
- if (triggers == null || !triggers.containsKey(triggerName)) {
+ Map<String, AutoScalingConfig.TriggerConfig> triggerConfigs = currentConfig.getTriggerConfigs();
+ if (!triggerConfigs.containsKey(triggerName)) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No trigger exists with name: " + triggerName);
}
-
- Map<String, Map<String, Object>> listeners = (Map<String, Map<String, Object>>) autoScalingConf.get("listeners");
+ triggerConfigs = new HashMap<>(triggerConfigs);
Set<String> activeListeners = new HashSet<>();
- if (listeners != null) {
- for (Map.Entry<String, Map<String, Object>> entry : listeners.entrySet()) {
- Map<String, Object> listenerProps = entry.getValue();
- if (triggerName.equals(listenerProps.get(TRIGGER)) && !removeListeners) {
- activeListeners.add(entry.getKey());
- }
- }
- }
- if (removeListeners) {
- for (String activeListener : activeListeners) {
- zkSetListener(container.getZkController().getZkStateReader(), activeListener, null);
+ Map<String, AutoScalingConfig.TriggerListenerConfig> listeners = currentConfig.getTriggerListenerConfigs();
+ for (AutoScalingConfig.TriggerListenerConfig listener : listeners.values()) {
+ if (triggerName.equals(listener.trigger)) {
+ activeListeners.add(listener.name);
}
- } else if (!activeListeners.isEmpty()) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
- "Cannot remove trigger: " + triggerName + " because it has active listeners: " + activeListeners);
}
-
- zkSetTrigger(container.getZkController().getZkStateReader(), triggerName, null);
- rsp.getValues().add("result", "success");
- }
-
- private void zkSetTrigger(ZkStateReader reader, String triggerName, Map<String, Object> triggerProperties) throws KeeperException, InterruptedException {
- while (true) {
- Stat stat = new Stat();
- ZkNodeProps loaded = null;
- byte[] data = reader.getZkClient().getData(SOLR_AUTOSCALING_CONF_PATH, null, stat, true);
- loaded = ZkNodeProps.load(data);
- Map<String, Object> triggers = (Map<String, Object>) loaded.get("triggers");
- if (triggers == null) triggers = new HashMap<>(1);
- if (triggerProperties != null) {
- triggers.put(triggerName, triggerProperties);
- } else {
- triggers.remove(triggerName);
- }
- loaded = loaded.plus("triggers", triggers);
- try {
- reader.getZkClient().setData(SOLR_AUTOSCALING_CONF_PATH, Utils.toJSON(loaded), stat.getVersion(), true);
- } catch (KeeperException.BadVersionException bve) {
- // somebody else has changed the configuration so we must retry
- continue;
- }
- break;
- }
- }
-
- private void zkSetPolicies(ZkStateReader reader, String policyBeRemoved, Map<String, Object> newPolicies) throws KeeperException, InterruptedException, IOException {
- while (true) {
- Stat stat = new Stat();
- ZkNodeProps loaded = null;
- byte[] data = reader.getZkClient().getData(SOLR_AUTOSCALING_CONF_PATH, null, stat, true);
- loaded = ZkNodeProps.load(data);
- Map<String, Object> policies = (Map<String, Object>) loaded.get("policies");
- if (policies == null) policies = new HashMap<>(1);
- if (newPolicies != null) {
- policies.putAll(newPolicies);
+ if (!activeListeners.isEmpty()) {
+ if (removeListeners) {
+ listeners = new HashMap<>(listeners);
+ for (String activeListener : activeListeners) {
+ listeners.remove(activeListener);
+ }
} else {
- policies.remove(policyBeRemoved);
- }
- loaded = loaded.plus("policies", policies);
- verifyAutoScalingConf(loaded.getProperties());
- try {
- reader.getZkClient().setData(SOLR_AUTOSCALING_CONF_PATH, Utils.toJSON(loaded), stat.getVersion(), true);
- } catch (KeeperException.BadVersionException bve) {
- // somebody else has changed the configuration so we must retry
- continue;
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+ "Cannot remove trigger: " + triggerName + " because it has active listeners: " + activeListeners);
}
- break;
}
+ triggerConfigs.remove(triggerName);
+ currentConfig = currentConfig.withTriggerConfigs(triggerConfigs).withTriggerListenerConfigs(listeners);
+ return currentConfig;
}
- private void zkSetPreferences(ZkStateReader reader, List preferences) throws KeeperException, InterruptedException, IOException {
- while (true) {
- Stat stat = new Stat();
- ZkNodeProps loaded = null;
- byte[] data = reader.getZkClient().getData(SOLR_AUTOSCALING_CONF_PATH, null, stat, true);
- loaded = ZkNodeProps.load(data);
- loaded = loaded.plus("cluster-preferences", preferences);
- verifyAutoScalingConf(loaded.getProperties());
- try {
- reader.getZkClient().setData(SOLR_AUTOSCALING_CONF_PATH, Utils.toJSON(loaded), stat.getVersion(), true);
- } catch (KeeperException.BadVersionException bve) {
- // somebody else has changed the configuration so we must retry
- continue;
- }
- break;
- }
- }
- private void zkSetClusterPolicy(ZkStateReader reader, List clusterPolicy) throws KeeperException, InterruptedException, IOException {
+ private void zkSetAutoScalingConfig(ZkStateReader reader, AutoScalingConfig currentConfig) throws KeeperException, InterruptedException, IOException {
while (true) {
- Stat stat = new Stat();
- ZkNodeProps loaded = null;
- byte[] data = reader.getZkClient().getData(SOLR_AUTOSCALING_CONF_PATH, null, stat, true);
- loaded = ZkNodeProps.load(data);
- loaded = loaded.plus("cluster-policy", clusterPolicy);
- verifyAutoScalingConf(loaded.getProperties());
+ verifyAutoScalingConf(currentConfig);
try {
- reader.getZkClient().setData(SOLR_AUTOSCALING_CONF_PATH, Utils.toJSON(loaded), stat.getVersion(), true);
+ reader.getZkClient().setData(SOLR_AUTOSCALING_CONF_PATH, Utils.toJSON(currentConfig), currentConfig.getZkVersion(), true);
} catch (KeeperException.BadVersionException bve) {
// somebody else has changed the configuration so we must retry
continue;
@@ -628,22 +562,15 @@ public class AutoScalingHandler extends RequestHandlerBase implements Permission
}
}
- private void verifyAutoScalingConf(Map<String, Object> autoScalingConf) throws IOException {
+ private void verifyAutoScalingConf(AutoScalingConfig autoScalingConf) throws IOException {
try (CloudSolrClient build = new CloudSolrClient.Builder()
.withHttpClient(container.getUpdateShardHandler().getHttpClient())
.withZkHost(container.getZkController().getZkServerAddress()).build()) {
- Policy policy = new Policy(autoScalingConf);
- Policy.Session session = policy.createSession(new SolrClientDataProvider(build));
+ Policy.Session session = autoScalingConf.getPolicy().createSession(new SolrClientDataProvider(build));
log.debug("Verified autoscaling configuration");
}
}
- private Map<String, Object> zkReadAutoScalingConf(ZkStateReader reader) throws KeeperException, InterruptedException {
- byte[] data = reader.getZkClient().getData(SOLR_AUTOSCALING_CONF_PATH, null, null, true);
- ZkNodeProps loaded = ZkNodeProps.load(data);
- return loaded.getProperties();
- }
-
@Override
public String getDescription() {
return "A handler for autoscaling configuration";
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1e80ceea/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java
index cfd9ca3..12a9395 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.Map;
import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.SolrClientDataProvider;
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1e80ceea/solr/core/src/java/org/apache/solr/cloud/autoscaling/HttpTriggerListener.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/HttpTriggerListener.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/HttpTriggerListener.java
index e50417f..2af4f30 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/HttpTriggerListener.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/HttpTriggerListener.java
@@ -29,6 +29,8 @@ import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.entity.StringEntity;
+import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
+import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventProcessorStage;
import org.apache.solr.client.solrj.impl.HttpClientUtil;
import org.apache.solr.common.util.Utils;
import org.apache.solr.core.CoreContainer;
@@ -86,7 +88,7 @@ public class HttpTriggerListener extends TriggerListenerBase {
}
@Override
- public void onEvent(TriggerEvent event, AutoScaling.EventProcessorStage stage, String actionName, ActionContext context, Throwable error, String message) {
+ public void onEvent(TriggerEvent event, TriggerEventProcessorStage stage, String actionName, ActionContext context, Throwable error, String message) {
Properties properties = new Properties();
properties.setProperty("stage", stage.toString());
// if configuration used "actionName" but we're in a non-action related stage then PropertiesUtil will
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1e80ceea/solr/core/src/java/org/apache/solr/cloud/autoscaling/LogTriggerListener.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/LogTriggerListener.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/LogTriggerListener.java
index 108f41b..736d946 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/LogTriggerListener.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/LogTriggerListener.java
@@ -17,10 +17,9 @@
package org.apache.solr.cloud.autoscaling;
-import java.io.IOException;
import java.lang.invoke.MethodHandles;
-import org.apache.solr.core.CoreContainer;
+import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventProcessorStage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,8 +31,8 @@ public class LogTriggerListener extends TriggerListenerBase {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@Override
- public void onEvent(TriggerEvent event, AutoScaling.EventProcessorStage stage, String actionName, ActionContext context,
- Throwable error, String message) {
+ public void onEvent(TriggerEvent event, TriggerEventProcessorStage stage, String actionName, ActionContext context,
+ Throwable error, String message) {
LOG.info("{}: stage={}, actionName={}, event={}, error={}, messsage={}", config.name, stage, actionName, event, error, message);
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1e80ceea/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java
index c51e586..2197dd0 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeAddedTrigger.java
@@ -34,6 +34,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.lucene.util.IOUtils;
+import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.core.CoreContainer;
@@ -43,7 +44,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Trigger for the {@link org.apache.solr.cloud.autoscaling.AutoScaling.EventType#NODEADDED} event
+ * Trigger for the {@link TriggerEventType#NODEADDED} event
*/
public class NodeAddedTrigger extends TriggerBase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -52,10 +53,10 @@ public class NodeAddedTrigger extends TriggerBase {
private final Map<String, Object> properties;
private final CoreContainer container;
private final List<TriggerAction> actions;
- private final AtomicReference<AutoScaling.EventProcessor> processorRef;
+ private final AtomicReference<AutoScaling.TriggerEventProcessor> processorRef;
private final boolean enabled;
private final int waitForSecond;
- private final AutoScaling.EventType eventType;
+ private final TriggerEventType eventType;
private final TimeSource timeSource;
private boolean isClosed = false;
@@ -86,7 +87,7 @@ public class NodeAddedTrigger extends TriggerBase {
log.debug("Initial livenodes: {}", lastLiveNodes);
this.enabled = (boolean) properties.getOrDefault("enabled", true);
this.waitForSecond = ((Long) properties.getOrDefault("waitFor", -1L)).intValue();
- this.eventType = AutoScaling.EventType.valueOf(properties.get("event").toString().toUpperCase(Locale.ROOT));
+ this.eventType = TriggerEventType.valueOf(properties.get("event").toString().toUpperCase(Locale.ROOT));
log.debug("NodeAddedTrigger {} instantiated with properties: {}", name, properties);
}
@@ -119,12 +120,12 @@ public class NodeAddedTrigger extends TriggerBase {
}
@Override
- public void setProcessor(AutoScaling.EventProcessor processor) {
+ public void setProcessor(AutoScaling.TriggerEventProcessor processor) {
processorRef.set(processor);
}
@Override
- public AutoScaling.EventProcessor getProcessor() {
+ public AutoScaling.TriggerEventProcessor getProcessor() {
return processorRef.get();
}
@@ -134,7 +135,7 @@ public class NodeAddedTrigger extends TriggerBase {
}
@Override
- public AutoScaling.EventType getEventType() {
+ public TriggerEventType getEventType() {
return eventType;
}
@@ -254,7 +255,7 @@ public class NodeAddedTrigger extends TriggerBase {
long now = timeSource.getTime();
if (TimeUnit.SECONDS.convert(now - timeAdded, TimeUnit.NANOSECONDS) >= getWaitForSecond()) {
// fire!
- AutoScaling.EventProcessor processor = processorRef.get();
+ AutoScaling.TriggerEventProcessor processor = processorRef.get();
if (processor != null) {
log.debug("NodeAddedTrigger {} firing registered processor for node: {} added at time {} , now: {}", name, nodeName, timeAdded, now);
if (processor.process(new NodeAddedEvent(getEventType(), getName(), timeAdded, nodeName))) {
@@ -297,7 +298,7 @@ public class NodeAddedTrigger extends TriggerBase {
public static class NodeAddedEvent extends TriggerEvent {
- public NodeAddedEvent(AutoScaling.EventType eventType, String source, long nodeAddedTime, String nodeAdded) {
+ public NodeAddedEvent(TriggerEventType eventType, String source, long nodeAddedTime, String nodeAdded) {
super(eventType, source, nodeAddedTime, Collections.singletonMap(NODE_NAME, nodeAdded));
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1e80ceea/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
index caf051b..fba1f3c 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/NodeLostTrigger.java
@@ -34,6 +34,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.lucene.util.IOUtils;
+import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.core.CoreContainer;
@@ -43,7 +44,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Trigger for the {@link AutoScaling.EventType#NODELOST} event
+ * Trigger for the {@link TriggerEventType#NODELOST} event
*/
public class NodeLostTrigger extends TriggerBase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -52,10 +53,10 @@ public class NodeLostTrigger extends TriggerBase {
private final Map<String, Object> properties;
private final CoreContainer container;
private final List<TriggerAction> actions;
- private final AtomicReference<AutoScaling.EventProcessor> processorRef;
+ private final AtomicReference<AutoScaling.TriggerEventProcessor> processorRef;
private final boolean enabled;
private final int waitForSecond;
- private final AutoScaling.EventType eventType;
+ private final TriggerEventType eventType;
private final TimeSource timeSource;
private boolean isClosed = false;
@@ -86,7 +87,7 @@ public class NodeLostTrigger extends TriggerBase {
log.debug("Initial livenodes: {}", lastLiveNodes);
this.enabled = (boolean) properties.getOrDefault("enabled", true);
this.waitForSecond = ((Long) properties.getOrDefault("waitFor", -1L)).intValue();
- this.eventType = AutoScaling.EventType.valueOf(properties.get("event").toString().toUpperCase(Locale.ROOT));
+ this.eventType = TriggerEventType.valueOf(properties.get("event").toString().toUpperCase(Locale.ROOT));
}
@Override
@@ -117,12 +118,12 @@ public class NodeLostTrigger extends TriggerBase {
}
@Override
- public void setProcessor(AutoScaling.EventProcessor processor) {
+ public void setProcessor(AutoScaling.TriggerEventProcessor processor) {
processorRef.set(processor);
}
@Override
- public AutoScaling.EventProcessor getProcessor() {
+ public AutoScaling.TriggerEventProcessor getProcessor() {
return processorRef.get();
}
@@ -132,7 +133,7 @@ public class NodeLostTrigger extends TriggerBase {
}
@Override
- public AutoScaling.EventType getEventType() {
+ public TriggerEventType getEventType() {
return eventType;
}
@@ -249,7 +250,7 @@ public class NodeLostTrigger extends TriggerBase {
Long timeRemoved = entry.getValue();
if (TimeUnit.SECONDS.convert(timeSource.getTime() - timeRemoved, TimeUnit.NANOSECONDS) >= getWaitForSecond()) {
// fire!
- AutoScaling.EventProcessor processor = processorRef.get();
+ AutoScaling.TriggerEventProcessor processor = processorRef.get();
if (processor != null) {
log.debug("NodeLostTrigger firing registered processor for lost node: {}", nodeName);
if (processor.process(new NodeLostEvent(getEventType(), getName(), timeRemoved, nodeName))) {
@@ -292,7 +293,7 @@ public class NodeLostTrigger extends TriggerBase {
public static class NodeLostEvent extends TriggerEvent {
- public NodeLostEvent(AutoScaling.EventType eventType, String source, long nodeLostTime, String nodeRemoved) {
+ public NodeLostEvent(TriggerEventType eventType, String source, long nodeLostTime, String nodeRemoved) {
super(eventType, source, nodeLostTime, Collections.singletonMap(NODE_NAME, nodeRemoved));
}
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1e80ceea/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
index 3666e1b..2072640 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/OverseerTriggerThread.java
@@ -23,21 +23,19 @@ import java.lang.invoke.MethodHandles;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
-import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
+import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
+import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.SolrZkClient;
-import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.cloud.ZooKeeperException;
-import org.apache.solr.common.params.AutoScalingParams;
import org.apache.solr.common.util.IOUtils;
-import org.apache.solr.common.util.Utils;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
@@ -182,10 +180,10 @@ public class OverseerTriggerThread implements Runnable, Closeable {
boolean cleanOldNodeAddedMarkers = true;
// add new triggers and/or replace and close the replaced triggers
for (Map.Entry<String, AutoScaling.Trigger> entry : copy.entrySet()) {
- if (entry.getValue().getEventType().equals(AutoScaling.EventType.NODELOST)) {
+ if (entry.getValue().getEventType().equals(TriggerEventType.NODELOST)) {
cleanOldNodeLostMarkers = false;
}
- if (entry.getValue().getEventType().equals(AutoScaling.EventType.NODEADDED)) {
+ if (entry.getValue().getEventType().equals(TriggerEventType.NODEADDED)) {
cleanOldNodeAddedMarkers = false;
}
scheduledTriggers.add(entry.getValue());
@@ -305,7 +303,7 @@ public class OverseerTriggerThread implements Runnable, Closeable {
for (Map.Entry<String, AutoScalingConfig.TriggerConfig> entry : triggers.entrySet()) {
AutoScalingConfig.TriggerConfig cfg = entry.getValue();
- AutoScaling.EventType eventType = cfg.eventType;
+ TriggerEventType eventType = cfg.event;
String triggerName = entry.getKey();
triggerMap.put(triggerName, triggerFactory.create(eventType, triggerName, cfg.properties));
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1e80ceea/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
index 4b7c0d0..30dd85c 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/ScheduledTriggers.java
@@ -40,6 +40,8 @@ import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.lucene.store.AlreadyClosedException;
+import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
+import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventProcessorStage;
import org.apache.solr.cloud.ActionThrottle;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.ZkController;
@@ -150,7 +152,7 @@ public class ScheduledTriggers implements Closeable {
ScheduledTrigger scheduledSource = scheduledTriggers.get(event.getSource());
if (scheduledSource == null) {
String msg = String.format(Locale.ROOT, "Ignoring autoscaling event %s because the source trigger: %s doesn't exist.", event.toString(), event.getSource());
- listeners.fireListeners(event.getSource(), event, AutoScaling.EventProcessorStage.FAILED, msg);
+ listeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.FAILED, msg);
log.warn(msg);
return false;
}
@@ -158,13 +160,13 @@ public class ScheduledTriggers implements Closeable {
AutoScaling.Trigger source = scheduledSource.trigger;
if (source.isClosed()) {
String msg = String.format(Locale.ROOT, "Ignoring autoscaling event %s because the source trigger: %s has already been closed", event.toString(), source);
- listeners.fireListeners(event.getSource(), event, AutoScaling.EventProcessorStage.ABORTED, msg);
+ listeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.ABORTED, msg);
log.warn(msg);
// we do not want to lose this event just because the trigger was closed, perhaps a replacement will need it
return false;
}
if (hasPendingActions.compareAndSet(false, true)) {
- listeners.fireListeners(event.getSource(), event, AutoScaling.EventProcessorStage.STARTED);
+ listeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.STARTED);
final boolean enqueued;
if (replaying) {
enqueued = false;
@@ -182,21 +184,21 @@ public class ScheduledTriggers implements Closeable {
actionThrottle.markAttemptingAction();
ActionContext actionContext = new ActionContext(coreContainer, newTrigger, new HashMap<>());
for (TriggerAction action : actions) {
- listeners.fireListeners(event.getSource(), event, AutoScaling.EventProcessorStage.BEFORE_ACTION, action.getName(), actionContext);
+ listeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.BEFORE_ACTION, action.getName(), actionContext);
try {
action.process(event, actionContext);
} catch (Exception e) {
- listeners.fireListeners(event.getSource(), event, AutoScaling.EventProcessorStage.FAILED, action.getName(), actionContext, e, null);
+ listeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.FAILED, action.getName(), actionContext, e, null);
log.error("Error executing action: " + action.getName() + " for trigger event: " + event, e);
throw e;
}
- listeners.fireListeners(event.getSource(), event, AutoScaling.EventProcessorStage.AFTER_ACTION, action.getName(), actionContext);
+ listeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.AFTER_ACTION, action.getName(), actionContext);
}
if (enqueued) {
TriggerEvent ev = scheduledTrigger.dequeue();
assert ev.getId().equals(event.getId());
}
- listeners.fireListeners(event.getSource(), event, AutoScaling.EventProcessorStage.SUCCEEDED);
+ listeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.SUCCEEDED);
} finally {
hasPendingActions.set(false);
}
@@ -209,7 +211,7 @@ public class ScheduledTriggers implements Closeable {
+ " is broken! Expected event=" + event + " but got " + ev);
}
}
- listeners.fireListeners(event.getSource(), event, AutoScaling.EventProcessorStage.SUCCEEDED);
+ listeners.fireListeners(event.getSource(), event, TriggerEventProcessorStage.SUCCEEDED);
hasPendingActions.set(false);
}
return true;
@@ -371,7 +373,7 @@ public class ScheduledTriggers implements Closeable {
}
private class TriggerListeners {
- Map<String, Map<AutoScaling.EventProcessorStage, List<TriggerListener>>> listenersPerStage = new HashMap<>();
+ Map<String, Map<TriggerEventProcessorStage, List<TriggerListener>>> listenersPerStage = new HashMap<>();
Map<String, TriggerListener> listenersPerName = new HashMap<>();
ReentrantLock updateLock = new ReentrantLock();
@@ -441,15 +443,15 @@ public class ScheduledTriggers implements Closeable {
continue;
}
// add per stage
- for (AutoScaling.EventProcessorStage stage : config.stages) {
+ for (TriggerEventProcessorStage stage : config.stages) {
addPerStage(config.trigger, stage, listener);
}
// add also for beforeAction / afterAction TriggerStage
if (!config.beforeActions.isEmpty()) {
- addPerStage(config.trigger, AutoScaling.EventProcessorStage.BEFORE_ACTION, listener);
+ addPerStage(config.trigger, TriggerEventProcessorStage.BEFORE_ACTION, listener);
}
if (!config.afterActions.isEmpty()) {
- addPerStage(config.trigger, AutoScaling.EventProcessorStage.AFTER_ACTION, listener);
+ addPerStage(config.trigger, TriggerEventProcessorStage.AFTER_ACTION, listener);
}
}
} finally {
@@ -457,8 +459,8 @@ public class ScheduledTriggers implements Closeable {
}
}
- private void addPerStage(String triggerName, AutoScaling.EventProcessorStage stage, TriggerListener listener) {
- Map<AutoScaling.EventProcessorStage, List<TriggerListener>> perStage =
+ private void addPerStage(String triggerName, TriggerEventProcessorStage stage, TriggerListener listener) {
+ Map<TriggerEventProcessorStage, List<TriggerListener>> perStage =
listenersPerStage.computeIfAbsent(triggerName, k -> new HashMap<>());
List<TriggerListener> lst = perStage.computeIfAbsent(stage, k -> new ArrayList<>(3));
lst.add(listener);
@@ -481,8 +483,8 @@ public class ScheduledTriggers implements Closeable {
reset();
}
- List<TriggerListener> getTriggerListeners(String trigger, AutoScaling.EventProcessorStage stage) {
- Map<AutoScaling.EventProcessorStage, List<TriggerListener>> perStage = listenersPerStage.get(trigger);
+ List<TriggerListener> getTriggerListeners(String trigger, TriggerEventProcessorStage stage) {
+ Map<TriggerEventProcessorStage, List<TriggerListener>> perStage = listenersPerStage.get(trigger);
if (perStage == null) {
return Collections.emptyList();
}
@@ -494,31 +496,31 @@ public class ScheduledTriggers implements Closeable {
}
}
- void fireListeners(String trigger, TriggerEvent event, AutoScaling.EventProcessorStage stage) {
+ void fireListeners(String trigger, TriggerEvent event, TriggerEventProcessorStage stage) {
fireListeners(trigger, event, stage, null, null, null, null);
}
- void fireListeners(String trigger, TriggerEvent event, AutoScaling.EventProcessorStage stage, String message) {
+ void fireListeners(String trigger, TriggerEvent event, TriggerEventProcessorStage stage, String message) {
fireListeners(trigger, event, stage, null, null, null, message);
}
- void fireListeners(String trigger, TriggerEvent event, AutoScaling.EventProcessorStage stage, String actionName,
+ void fireListeners(String trigger, TriggerEvent event, TriggerEventProcessorStage stage, String actionName,
ActionContext context) {
fireListeners(trigger, event, stage, actionName, context, null, null);
}
- void fireListeners(String trigger, TriggerEvent event, AutoScaling.EventProcessorStage stage, String actionName,
+ void fireListeners(String trigger, TriggerEvent event, TriggerEventProcessorStage stage, String actionName,
ActionContext context, Throwable error, String message) {
updateLock.lock();
try {
for (TriggerListener listener : getTriggerListeners(trigger, stage)) {
if (actionName != null) {
AutoScalingConfig.TriggerListenerConfig config = listener.getConfig();
- if (stage == AutoScaling.EventProcessorStage.BEFORE_ACTION) {
+ if (stage == TriggerEventProcessorStage.BEFORE_ACTION) {
if (!config.beforeActions.contains(actionName)) {
continue;
}
- } else if (stage == AutoScaling.EventProcessorStage.AFTER_ACTION) {
+ } else if (stage == TriggerEventProcessorStage.AFTER_ACTION) {
if (!config.afterActions.contains(actionName)) {
continue;
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1e80ceea/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEvent.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEvent.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEvent.java
index fa27759..12d4fef 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEvent.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEvent.java
@@ -20,6 +20,7 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
+import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
import org.apache.solr.common.MapWriter;
import org.apache.solr.util.IdUtils;
@@ -33,15 +34,15 @@ public class TriggerEvent implements MapWriter {
protected final String id;
protected final String source;
protected final long eventTime;
- protected final AutoScaling.EventType eventType;
+ protected final TriggerEventType eventType;
protected final Map<String, Object> properties = new HashMap<>();
- public TriggerEvent(AutoScaling.EventType eventType, String source, long eventTime,
+ public TriggerEvent(TriggerEventType eventType, String source, long eventTime,
Map<String, Object> properties) {
this(IdUtils.timeRandomId(eventTime), eventType, source, eventTime, properties);
}
- public TriggerEvent(String id, AutoScaling.EventType eventType, String source, long eventTime,
+ public TriggerEvent(String id, TriggerEventType eventType, String source, long eventTime,
Map<String, Object> properties) {
this.id = id;
this.eventType = eventType;
@@ -92,7 +93,7 @@ public class TriggerEvent implements MapWriter {
/**
* Event type.
*/
- public AutoScaling.EventType getEventType() {
+ public TriggerEventType getEventType() {
return eventType;
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1e80ceea/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEventQueue.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEventQueue.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEventQueue.java
index 3a73f54..99f641c 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEventQueue.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEventQueue.java
@@ -3,6 +3,7 @@ package org.apache.solr.cloud.autoscaling;
import java.lang.invoke.MethodHandles;
import java.util.Map;
+import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
import org.apache.solr.cloud.DistributedQueue;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.common.cloud.SolrZkClient;
@@ -91,7 +92,7 @@ public class TriggerEventQueue extends DistributedQueue {
String id = (String)map.get("id");
String source = (String)map.get("source");
long eventTime = ((Number)map.get("eventTime")).longValue();
- AutoScaling.EventType eventType = AutoScaling.EventType.valueOf((String)map.get("eventType"));
+ TriggerEventType eventType = TriggerEventType.valueOf((String)map.get("eventType"));
Map<String, Object> properties = (Map<String, Object>)map.get("properties");
TriggerEvent res = new TriggerEvent(id, eventType, source, eventTime, properties);
res.getProperties().put(DEQUEUE_TIME, timeSource.getTime());
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1e80ceea/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerListener.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerListener.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerListener.java
index 479de49..3688bfc 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerListener.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerListener.java
@@ -18,6 +18,8 @@ package org.apache.solr.cloud.autoscaling;
import java.io.Closeable;
+import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
+import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventProcessorStage;
import org.apache.solr.core.CoreContainer;
/**
@@ -34,12 +36,12 @@ public interface TriggerListener extends Closeable {
* This method is called when either a particular <code>stage</code> or
* <code>actionName</code> is reached during event processing.
* @param event current event being processed
- * @param stage {@link AutoScaling.EventProcessorStage} that this listener was registered for, or null
+ * @param stage {@link TriggerEventProcessorStage} that this listener was registered for, or null
* @param actionName {@link TriggerAction} name that this listener was registered for, or null
* @param context optional {@link ActionContext} when the processing stage is related to an action, or null
* @param error optional {@link Throwable} error, or null
* @param message optional message
*/
- void onEvent(TriggerEvent event, AutoScaling.EventProcessorStage stage, String actionName, ActionContext context,
+ void onEvent(TriggerEvent event, TriggerEventProcessorStage stage, String actionName, ActionContext context,
Throwable error, String message) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/1e80ceea/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerListenerBase.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerListenerBase.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerListenerBase.java
index 1cefa0e..01a4413 100644
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerListenerBase.java
+++ b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerListenerBase.java
@@ -18,6 +18,7 @@ package org.apache.solr.cloud.autoscaling;
import java.io.IOException;
+import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
import org.apache.solr.core.CoreContainer;
/**