You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by da...@apache.org on 2018/10/23 00:05:56 UTC
[35/52] [abbrv] [partial] lucene-solr:jira/gradle: Add gradle support
for Solr
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0ae21ad0/solr/core/src/java/org/apache/solr/cloud/autoscaling/SearchRateTrigger.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/SearchRateTrigger.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/SearchRateTrigger.java
deleted file mode 100644
index 81d56d3..0000000
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/SearchRateTrigger.java
+++ /dev/null
@@ -1,797 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.cloud.autoscaling;
-
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.util.concurrent.AtomicDouble;
-import org.apache.solr.client.solrj.cloud.autoscaling.Policy;
-import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
-import org.apache.solr.client.solrj.cloud.SolrCloudManager;
-import org.apache.solr.client.solrj.cloud.autoscaling.Suggester;
-import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
-import org.apache.solr.common.SolrException;
-import org.apache.solr.common.cloud.ClusterState;
-import org.apache.solr.common.cloud.Replica;
-import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.params.AutoScalingParams;
-import org.apache.solr.common.params.CollectionParams;
-import org.apache.solr.common.util.Pair;
-import org.apache.solr.common.util.StrUtils;
-import org.apache.solr.common.util.Utils;
-import org.apache.solr.core.SolrResourceLoader;
-import org.apache.solr.metrics.SolrCoreMetricManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Trigger for the {@link org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType#SEARCHRATE} event.
- */
-public class SearchRateTrigger extends TriggerBase {
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
- public static final String COLLECTIONS_PROP = "collections";
- public static final String METRIC_PROP = "metric";
- public static final String MAX_OPS_PROP = "maxOps";
- public static final String MIN_REPLICAS_PROP = "minReplicas";
- public static final String ABOVE_RATE_PROP = "aboveRate";
- public static final String BELOW_RATE_PROP = "belowRate";
- public static final String ABOVE_NODE_RATE_PROP = "aboveNodeRate";
- public static final String BELOW_NODE_RATE_PROP = "belowNodeRate";
- public static final String ABOVE_OP_PROP = "aboveOp";
- public static final String BELOW_OP_PROP = "belowOp";
- public static final String ABOVE_NODE_OP_PROP = "aboveNodeOp";
- public static final String BELOW_NODE_OP_PROP = "belowNodeOp";
-
- // back-compat
- public static final String BC_COLLECTION_PROP = "collection";
- public static final String BC_RATE_PROP = "rate";
-
-
- public static final String HOT_NODES = "hotNodes";
- public static final String HOT_COLLECTIONS = "hotCollections";
- public static final String HOT_SHARDS = "hotShards";
- public static final String HOT_REPLICAS = "hotReplicas";
- public static final String COLD_NODES = "coldNodes";
- public static final String COLD_COLLECTIONS = "coldCollections";
- public static final String COLD_SHARDS = "coldShards";
- public static final String COLD_REPLICAS = "coldReplicas";
- public static final String VIOLATION_PROP = "violationType";
-
- public static final int DEFAULT_MAX_OPS = 3;
- public static final String DEFAULT_METRIC = "QUERY./select.requestTimes:1minRate";
-
- private String metric;
- private int maxOps;
- private Integer minReplicas = null;
- private final Set<String> collections = new HashSet<>();
- private String shard;
- private String node;
- private double aboveRate;
- private double belowRate;
- private double aboveNodeRate;
- private double belowNodeRate;
- private CollectionParams.CollectionAction aboveOp, belowOp, aboveNodeOp, belowNodeOp;
- private final Map<String, Long> lastCollectionEvent = new ConcurrentHashMap<>();
- private final Map<String, Long> lastNodeEvent = new ConcurrentHashMap<>();
- private final Map<String, Long> lastShardEvent = new ConcurrentHashMap<>();
- private final Map<String, Long> lastReplicaEvent = new ConcurrentHashMap<>();
- private final Map<String, Object> state = new HashMap<>();
-
- public SearchRateTrigger(String name) {
- super(TriggerEventType.SEARCHRATE, name);
- this.state.put("lastCollectionEvent", lastCollectionEvent);
- this.state.put("lastNodeEvent", lastNodeEvent);
- this.state.put("lastShardEvent", lastShardEvent);
- this.state.put("lastReplicaEvent", lastReplicaEvent);
- TriggerUtils.validProperties(validProperties,
- COLLECTIONS_PROP, AutoScalingParams.SHARD, AutoScalingParams.NODE,
- METRIC_PROP,
- MAX_OPS_PROP,
- MIN_REPLICAS_PROP,
- ABOVE_OP_PROP,
- BELOW_OP_PROP,
- ABOVE_NODE_OP_PROP,
- BELOW_NODE_OP_PROP,
- ABOVE_RATE_PROP,
- BELOW_RATE_PROP,
- ABOVE_NODE_RATE_PROP,
- BELOW_NODE_RATE_PROP,
- // back-compat props
- BC_COLLECTION_PROP,
- BC_RATE_PROP);
- }
-
- @Override
- public void configure(SolrResourceLoader loader, SolrCloudManager cloudManager, Map<String, Object> properties) throws TriggerValidationException {
- super.configure(loader, cloudManager, properties);
- // parse config options
- String collectionsStr = (String)properties.get(COLLECTIONS_PROP);
- if (collectionsStr != null) {
- collections.addAll(StrUtils.splitSmart(collectionsStr, ','));
- }
- // check back-compat collection prop
- collectionsStr = (String)properties.get(BC_COLLECTION_PROP);
- if (collectionsStr != null) {
- if (!collectionsStr.equals(Policy.ANY)) {
- collections.add(collectionsStr);
- }
- }
- shard = (String)properties.getOrDefault(AutoScalingParams.SHARD, Policy.ANY);
- if (!shard.equals(Policy.ANY) && (collections.isEmpty() || collections.size() > 1)) {
- throw new TriggerValidationException(name, AutoScalingParams.SHARD, "When 'shard' is other than #ANY then exactly one collection name must be set");
- }
- node = (String)properties.getOrDefault(AutoScalingParams.NODE, Policy.ANY);
- metric = (String)properties.getOrDefault(METRIC_PROP, DEFAULT_METRIC);
-
- String maxOpsStr = String.valueOf(properties.getOrDefault(MAX_OPS_PROP, DEFAULT_MAX_OPS));
- try {
- maxOps = Integer.parseInt(maxOpsStr);
- } catch (Exception e) {
- throw new TriggerValidationException(name, MAX_OPS_PROP, "invalid value '" + maxOpsStr + "': " + e.toString());
- }
-
- Object o = properties.get(MIN_REPLICAS_PROP);
- if (o != null) {
- try {
- minReplicas = Integer.parseInt(o.toString());
- if (minReplicas < 1) {
- throw new Exception("must be at least 1, or not set to use 'replicationFactor'");
- }
- } catch (Exception e) {
- throw new TriggerValidationException(name, MIN_REPLICAS_PROP, "invalid value '" + o + "': " + e.toString());
- }
- }
-
- Object above = properties.get(ABOVE_RATE_PROP);
- Object below = properties.get(BELOW_RATE_PROP);
- // back-compat rate prop
- if (properties.containsKey(BC_RATE_PROP)) {
- above = properties.get(BC_RATE_PROP);
- }
- if (above == null && below == null) {
- throw new TriggerValidationException(name, ABOVE_RATE_PROP, "at least one of '" +
- ABOVE_RATE_PROP + "' or '" + BELOW_RATE_PROP + "' must be set");
- }
- if (above != null) {
- try {
- aboveRate = Double.parseDouble(String.valueOf(above));
- } catch (Exception e) {
- throw new TriggerValidationException(name, ABOVE_RATE_PROP, "Invalid configuration value: '" + above + "': " + e.toString());
- }
- } else {
- aboveRate = Double.MAX_VALUE;
- }
- if (below != null) {
- try {
- belowRate = Double.parseDouble(String.valueOf(below));
- } catch (Exception e) {
- throw new TriggerValidationException(name, BELOW_RATE_PROP, "Invalid configuration value: '" + below + "': " + e.toString());
- }
- } else {
- belowRate = -1;
- }
-
- // node rates
- above = properties.get(ABOVE_NODE_RATE_PROP);
- below = properties.get(BELOW_NODE_RATE_PROP);
- if (above != null) {
- try {
- aboveNodeRate = Double.parseDouble(String.valueOf(above));
- } catch (Exception e) {
- throw new TriggerValidationException(name, ABOVE_NODE_RATE_PROP, "Invalid configuration value: '" + above + "': " + e.toString());
- }
- } else {
- aboveNodeRate = Double.MAX_VALUE;
- }
- if (below != null) {
- try {
- belowNodeRate = Double.parseDouble(String.valueOf(below));
- } catch (Exception e) {
- throw new TriggerValidationException(name, BELOW_NODE_RATE_PROP, "Invalid configuration value: '" + below + "': " + e.toString());
- }
- } else {
- belowNodeRate = -1;
- }
-
- String aboveOpStr = String.valueOf(properties.getOrDefault(ABOVE_OP_PROP, CollectionParams.CollectionAction.ADDREPLICA.toLower()));
- String belowOpStr = String.valueOf(properties.getOrDefault(BELOW_OP_PROP, CollectionParams.CollectionAction.DELETEREPLICA.toLower()));
- aboveOp = CollectionParams.CollectionAction.get(aboveOpStr);
- if (aboveOp == null) {
- throw new TriggerValidationException(getName(), ABOVE_OP_PROP, "unrecognized value: '" + aboveOpStr + "'");
- }
- belowOp = CollectionParams.CollectionAction.get(belowOpStr);
- if (belowOp == null) {
- throw new TriggerValidationException(getName(), BELOW_OP_PROP, "unrecognized value: '" + belowOpStr + "'");
- }
- Object aboveNodeObj = properties.getOrDefault(ABOVE_NODE_OP_PROP, CollectionParams.CollectionAction.MOVEREPLICA.toLower());
- // do NOT set the default to DELETENODE
- Object belowNodeObj = properties.get(BELOW_NODE_OP_PROP);
- try {
- aboveNodeOp = CollectionParams.CollectionAction.get(String.valueOf(aboveNodeObj));
- } catch (Exception e) {
- throw new TriggerValidationException(getName(), ABOVE_NODE_OP_PROP, "unrecognized value: '" + aboveNodeObj + "'");
- }
- if (belowNodeObj != null) {
- try {
- belowNodeOp = CollectionParams.CollectionAction.get(String.valueOf(belowNodeObj));
- } catch (Exception e) {
- throw new TriggerValidationException(getName(), BELOW_NODE_OP_PROP, "unrecognized value: '" + belowNodeObj + "'");
- }
- }
- }
-
- @VisibleForTesting
- Map<String, Object> getConfig() {
- Map<String, Object> config = new HashMap<>();
- config.put("name", name);
- config.put(COLLECTIONS_PROP, collections);
- config.put(AutoScalingParams.SHARD, shard);
- config.put(AutoScalingParams.NODE, node);
- config.put(METRIC_PROP, metric);
- config.put(MAX_OPS_PROP, maxOps);
- config.put(MIN_REPLICAS_PROP, minReplicas);
- config.put(ABOVE_RATE_PROP, aboveRate);
- config.put(BELOW_RATE_PROP, belowRate);
- config.put(ABOVE_NODE_RATE_PROP, aboveNodeRate);
- config.put(BELOW_NODE_RATE_PROP, belowNodeRate);
- config.put(ABOVE_OP_PROP, aboveOp);
- config.put(ABOVE_NODE_OP_PROP, aboveNodeOp);
- config.put(BELOW_OP_PROP, belowOp);
- config.put(BELOW_NODE_OP_PROP, belowNodeOp);
- return config;
- }
-
- @Override
- protected Map<String, Object> getState() {
- return state;
- }
-
- @Override
- protected void setState(Map<String, Object> state) {
- lastCollectionEvent.clear();
- lastNodeEvent.clear();
- lastShardEvent.clear();
- lastReplicaEvent.clear();
- Map<String, Long> collTimes = (Map<String, Long>)state.get("lastCollectionEvent");
- if (collTimes != null) {
- lastCollectionEvent.putAll(collTimes);
- }
- Map<String, Long> nodeTimes = (Map<String, Long>)state.get("lastNodeEvent");
- if (nodeTimes != null) {
- lastNodeEvent.putAll(nodeTimes);
- }
- Map<String, Long> shardTimes = (Map<String, Long>)state.get("lastShardEvent");
- if (shardTimes != null) {
- lastShardEvent.putAll(shardTimes);
- }
- Map<String, Long> replicaTimes = (Map<String, Long>)state.get("lastReplicaEvent");
- if (replicaTimes != null) {
- lastReplicaEvent.putAll(replicaTimes);
- }
- }
-
- @Override
- public void restoreState(AutoScaling.Trigger old) {
- assert old.isClosed();
- if (old instanceof SearchRateTrigger) {
- SearchRateTrigger that = (SearchRateTrigger)old;
- assert this.name.equals(that.name);
- this.lastCollectionEvent.clear();
- this.lastNodeEvent.clear();
- this.lastShardEvent.clear();
- this.lastReplicaEvent.clear();
- this.lastCollectionEvent.putAll(that.lastCollectionEvent);
- this.lastNodeEvent.putAll(that.lastNodeEvent);
- this.lastShardEvent.putAll(that.lastShardEvent);
- this.lastReplicaEvent.putAll(that.lastReplicaEvent);
- } else {
- throw new SolrException(SolrException.ErrorCode.INVALID_STATE,
- "Unable to restore state from an unknown type of trigger");
- }
-
- }
-
- @Override
- public void run() {
- AutoScaling.TriggerEventProcessor processor = processorRef.get();
- if (processor == null) {
- return;
- }
-
- // collection, shard, list(replica + rate)
- Map<String, Map<String, List<ReplicaInfo>>> collectionRates = new HashMap<>();
- // node, rate
- Map<String, AtomicDouble> nodeRates = new HashMap<>();
- // this replication factor only considers replica types that are searchable
- // collection, shard, RF
- Map<String, Map<String, AtomicInteger>> searchableReplicationFactors = new HashMap<>();
-
- ClusterState clusterState = null;
- try {
- clusterState = cloudManager.getClusterStateProvider().getClusterState();
- } catch (IOException e) {
- log.warn("Error getting ClusterState", e);
- return;
- }
- for (String node : cloudManager.getClusterStateProvider().getLiveNodes()) {
- Map<String, ReplicaInfo> metricTags = new HashMap<>();
- // coll, shard, replica
- Map<String, Map<String, List<ReplicaInfo>>> infos = cloudManager.getNodeStateProvider().getReplicaInfo(node, Collections.emptyList());
- infos.forEach((coll, shards) -> {
- Map<String, AtomicInteger> replPerShard = searchableReplicationFactors.computeIfAbsent(coll, c -> new HashMap<>());
- shards.forEach((sh, replicas) -> {
- AtomicInteger repl = replPerShard.computeIfAbsent(sh, s -> new AtomicInteger());
- replicas.forEach(replica -> {
- // skip non-active replicas
- if (replica.getState() != Replica.State.ACTIVE) {
- return;
- }
- repl.incrementAndGet();
- // we have to translate to the metrics registry name, which uses "_replica_nN" as suffix
- String replicaName = Utils.parseMetricsReplicaName(coll, replica.getCore());
- if (replicaName == null) { // should never happen???
- replicaName = replica.getName(); // which is actually coreNode name...
- }
- String registry = SolrCoreMetricManager.createRegistryName(true, coll, sh, replicaName, null);
- String tag = "metrics:" + registry + ":" + metric;
- metricTags.put(tag, replica);
- });
- });
- });
- if (metricTags.isEmpty()) {
- continue;
- }
- Map<String, Object> rates = cloudManager.getNodeStateProvider().getNodeValues(node, metricTags.keySet());
- if (log.isDebugEnabled()) {
- log.debug("### rates for node " + node);
- rates.forEach((tag, rate) -> log.debug("### " + tag + "\t" + rate));
- }
- rates.forEach((tag, rate) -> {
- ReplicaInfo info = metricTags.get(tag);
- if (info == null) {
- log.warn("Missing replica info for response tag " + tag);
- } else {
- Map<String, List<ReplicaInfo>> perCollection = collectionRates.computeIfAbsent(info.getCollection(), s -> new HashMap<>());
- List<ReplicaInfo> perShard = perCollection.computeIfAbsent(info.getShard(), s -> new ArrayList<>());
- info = (ReplicaInfo)info.clone();
- info.getVariables().put(AutoScalingParams.RATE, ((Number)rate).doubleValue());
- perShard.add(info);
- AtomicDouble perNode = nodeRates.computeIfAbsent(node, s -> new AtomicDouble());
- perNode.addAndGet(((Number)rate).doubleValue());
- }
- });
- }
-
- if (log.isDebugEnabled()) {
- collectionRates.forEach((coll, collRates) -> {
- log.debug("## Collection: {}", coll);
- collRates.forEach((s, replicas) -> {
- log.debug("## - {}", s);
- replicas.forEach(ri -> log.debug("## {} {}", ri.getCore(), ri.getVariable(AutoScalingParams.RATE)));
- });
- });
- }
- long now = cloudManager.getTimeSource().getTimeNs();
- Map<String, Double> hotNodes = new HashMap<>();
- Map<String, Double> coldNodes = new HashMap<>();
-
- // check for exceeded rates and filter out those with less than waitFor from previous events
- nodeRates.entrySet().stream()
- .filter(entry -> node.equals(Policy.ANY) || node.equals(entry.getKey()))
- .forEach(entry -> {
- if (entry.getValue().get() > aboveNodeRate) {
- if (waitForElapsed(entry.getKey(), now, lastNodeEvent)) {
- hotNodes.put(entry.getKey(), entry.getValue().get());
- }
- } else if (entry.getValue().get() < belowNodeRate) {
- if (waitForElapsed(entry.getKey(), now, lastNodeEvent)) {
- coldNodes.put(entry.getKey(), entry.getValue().get());
- }
- } else {
- // no violation - clear waitForElapsed
- // (violation is only valid if it persists throughout waitFor)
- lastNodeEvent.remove(entry.getKey());
- }
- });
-
- Map<String, Map<String, Double>> hotShards = new HashMap<>();
- Map<String, Map<String, Double>> coldShards = new HashMap<>();
- List<ReplicaInfo> hotReplicas = new ArrayList<>();
- List<ReplicaInfo> coldReplicas = new ArrayList<>();
- collectionRates.forEach((coll, shardRates) -> {
- shardRates.forEach((sh, replicaRates) -> {
- double totalShardRate = replicaRates.stream()
- .map(r -> {
- String elapsedKey = r.getCollection() + "." + r.getCore();
- if ((Double)r.getVariable(AutoScalingParams.RATE) > aboveRate) {
- if (waitForElapsed(elapsedKey, now, lastReplicaEvent)) {
- hotReplicas.add(r);
- }
- } else if ((Double)r.getVariable(AutoScalingParams.RATE) < belowRate) {
- if (waitForElapsed(elapsedKey, now, lastReplicaEvent)) {
- coldReplicas.add(r);
- }
- } else {
- // no violation - clear waitForElapsed
- lastReplicaEvent.remove(elapsedKey);
- }
- return r;
- })
- .mapToDouble(r -> (Double)r.getVariable(AutoScalingParams.RATE)).sum();
- // calculate average shard rate over all searchable replicas (see SOLR-12470)
- double shardRate = totalShardRate / searchableReplicationFactors.get(coll).get(sh).doubleValue();
- String elapsedKey = coll + "." + sh;
- log.debug("-- {}: totalShardRate={}, shardRate={}", elapsedKey, totalShardRate, shardRate);
- if ((collections.isEmpty() || collections.contains(coll)) &&
- (shard.equals(Policy.ANY) || shard.equals(sh))) {
- if (shardRate > aboveRate) {
- if (waitForElapsed(elapsedKey, now, lastShardEvent)) {
- hotShards.computeIfAbsent(coll, s -> new HashMap<>()).put(sh, shardRate);
- }
- } else if (shardRate < belowRate) {
- if (waitForElapsed(elapsedKey, now, lastShardEvent)) {
- coldShards.computeIfAbsent(coll, s -> new HashMap<>()).put(sh, shardRate);
- log.debug("-- coldShard waitFor elapsed {}", elapsedKey);
- } else {
- if (log.isDebugEnabled()) {
- Long lastTime = lastShardEvent.computeIfAbsent(elapsedKey, s -> now);
- long elapsed = TimeUnit.SECONDS.convert(now - lastTime, TimeUnit.NANOSECONDS);
- log.debug("-- waitFor didn't elapse for {}, waitFor={}, elapsed={}", elapsedKey, getWaitForSecond(), elapsed);
- }
- }
- } else {
- // no violation - clear waitForElapsed
- lastShardEvent.remove(elapsedKey);
- }
- }
- });
- });
-
- Map<String, Double> hotCollections = new HashMap<>();
- Map<String, Double> coldCollections = new HashMap<>();
- collectionRates.forEach((coll, shardRates) -> {
- double total = shardRates.entrySet().stream()
- .mapToDouble(e -> e.getValue().stream()
- .mapToDouble(r -> (Double)r.getVariable(AutoScalingParams.RATE)).sum()).sum();
- if (collections.isEmpty() || collections.contains(coll)) {
- if (total > aboveRate) {
- if (waitForElapsed(coll, now, lastCollectionEvent)) {
- hotCollections.put(coll, total);
- }
- } else if (total < belowRate) {
- if (waitForElapsed(coll, now, lastCollectionEvent)) {
- coldCollections.put(coll, total);
- }
- } else {
- // no violation - clear waitForElapsed
- lastCollectionEvent.remove(coll);
- }
- }
- });
-
- if (hotCollections.isEmpty() &&
- hotShards.isEmpty() &&
- hotReplicas.isEmpty() &&
- hotNodes.isEmpty() &&
- coldCollections.isEmpty() &&
- coldShards.isEmpty() &&
- coldReplicas.isEmpty() &&
- coldNodes.isEmpty()) {
- return;
- }
-
- // generate event
-
- // find the earliest time when a condition was exceeded
- final AtomicLong eventTime = new AtomicLong(now);
- hotCollections.forEach((c, r) -> {
- long time = lastCollectionEvent.get(c);
- if (eventTime.get() > time) {
- eventTime.set(time);
- }
- });
- coldCollections.forEach((c, r) -> {
- long time = lastCollectionEvent.get(c);
- if (eventTime.get() > time) {
- eventTime.set(time);
- }
- });
- hotShards.forEach((c, shards) -> {
- shards.forEach((s, r) -> {
- long time = lastShardEvent.get(c + "." + s);
- if (eventTime.get() > time) {
- eventTime.set(time);
- }
- });
- });
- coldShards.forEach((c, shards) -> {
- shards.forEach((s, r) -> {
- long time = lastShardEvent.get(c + "." + s);
- if (eventTime.get() > time) {
- eventTime.set(time);
- }
- });
- });
- hotReplicas.forEach(r -> {
- long time = lastReplicaEvent.get(r.getCollection() + "." + r.getCore());
- if (eventTime.get() > time) {
- eventTime.set(time);
- }
- });
- coldReplicas.forEach(r -> {
- long time = lastReplicaEvent.get(r.getCollection() + "." + r.getCore());
- if (eventTime.get() > time) {
- eventTime.set(time);
- }
- });
- hotNodes.forEach((n, r) -> {
- long time = lastNodeEvent.get(n);
- if (eventTime.get() > time) {
- eventTime.set(time);
- }
- });
- coldNodes.forEach((n, r) -> {
- long time = lastNodeEvent.get(n);
- if (eventTime.get() > time) {
- eventTime.set(time);
- }
- });
-
- final List<TriggerEvent.Op> ops = new ArrayList<>();
- final Set<String> violations = new HashSet<>();
-
- calculateHotOps(ops, violations, searchableReplicationFactors, hotNodes, hotCollections, hotShards, hotReplicas);
- calculateColdOps(ops, violations, clusterState, searchableReplicationFactors, coldNodes, coldCollections, coldShards, coldReplicas);
-
- if (ops.isEmpty()) {
- return;
- }
-
- if (processor.process(new SearchRateEvent(getName(), eventTime.get(), ops,
- hotNodes, hotCollections, hotShards, hotReplicas,
- coldNodes, coldCollections, coldShards, coldReplicas, violations))) {
- // update lastEvent times
- hotNodes.keySet().forEach(node -> lastNodeEvent.put(node, now));
- coldNodes.keySet().forEach(node -> lastNodeEvent.put(node, now));
- hotCollections.keySet().forEach(coll -> lastCollectionEvent.put(coll, now));
- coldCollections.keySet().forEach(coll -> lastCollectionEvent.put(coll, now));
- hotShards.entrySet().forEach(e -> e.getValue()
- .forEach((sh, rate) -> lastShardEvent.put(e.getKey() + "." + sh, now)));
- coldShards.entrySet().forEach(e -> e.getValue()
- .forEach((sh, rate) -> lastShardEvent.put(e.getKey() + "." + sh, now)));
- hotReplicas.forEach(r -> lastReplicaEvent.put(r.getCollection() + "." + r.getCore(), now));
- coldReplicas.forEach(r -> lastReplicaEvent.put(r.getCollection() + "." + r.getCore(), now));
- }
- }
-
- private void calculateHotOps(List<TriggerEvent.Op> ops,
- Set<String> violations,
- Map<String, Map<String, AtomicInteger>> searchableReplicationFactors,
- Map<String, Double> hotNodes,
- Map<String, Double> hotCollections,
- Map<String, Map<String, Double>> hotShards,
- List<ReplicaInfo> hotReplicas) {
- // calculate the number of replicas to add to each hot shard, based on how much the rate was
- // exceeded - but within limits.
-
- // first resolve a situation when only a node is hot but no collection / shard is hot
- // TODO: eventually we may want to commission a new node
- if (!hotNodes.isEmpty()) {
- if (hotShards.isEmpty() && hotCollections.isEmpty()) {
- // move replicas around
- if (aboveNodeOp != null) {
- hotNodes.forEach((n, r) -> {
- ops.add(new TriggerEvent.Op(aboveNodeOp, Suggester.Hint.SRC_NODE, n));
- violations.add(HOT_NODES);
- });
- }
- } else {
- // ignore - hot shards will result in changes that will change hot node status anyway
- }
- }
- // add replicas
- Map<String, Map<String, List<Pair<String, String>>>> hints = new HashMap<>();
-
- // HOT COLLECTIONS
- // currently we don't do anything for hot collections. Theoretically we could add
- // 1 replica more to each shard, based on how close to the threshold each shard is
- // but it's probably better to wait for a shard to become hot and be more precise.
-
- // HOT SHARDS
-
- hotShards.forEach((coll, shards) -> shards.forEach((s, r) -> {
- List<Pair<String, String>> perShard = hints
- .computeIfAbsent(coll, c -> new HashMap<>())
- .computeIfAbsent(s, sh -> new ArrayList<>());
- addReplicaHints(coll, s, r, searchableReplicationFactors.get(coll).get(s).get(), perShard);
- violations.add(HOT_SHARDS);
- }));
-
- // HOT REPLICAS
- // Hot replicas (while their shards are not hot) may be caused by
- // dumb clients that use direct replica URLs - this is beyond our control
- // so ignore them.
-
- hints.values().forEach(m -> m.values().forEach(lst -> lst.forEach(p -> {
- ops.add(new TriggerEvent.Op(aboveOp, Suggester.Hint.COLL_SHARD, p));
- })));
-
- }
-
- /**
- * This method implements a primitive form of proportional controller with a limiter.
- */
- private void addReplicaHints(String collection, String shard, double r, int replicationFactor, List<Pair<String, String>> hints) {
- int numReplicas = (int)Math.round((r - aboveRate) / (double) replicationFactor);
- // in one event add at least 1 replica
- if (numReplicas < 1) {
- numReplicas = 1;
- }
- // ... and at most maxOps replicas
- if (numReplicas > maxOps) {
- numReplicas = maxOps;
- }
- for (int i = 0; i < numReplicas; i++) {
- hints.add(new Pair(collection, shard));
- }
- }
-
- private void calculateColdOps(List<TriggerEvent.Op> ops,
- Set<String> violations,
- ClusterState clusterState,
- Map<String, Map<String, AtomicInteger>> searchableReplicationFactors,
- Map<String, Double> coldNodes,
- Map<String, Double> coldCollections,
- Map<String, Map<String, Double>> coldShards,
- List<ReplicaInfo> coldReplicas) {
- // COLD COLLECTIONS
- // Probably can't do anything reasonable about whole cold collections
- // because they may be needed even if not used.
-
- // COLD SHARDS & COLD REPLICAS:
- // We remove cold replicas only from cold shards, otherwise we are susceptible to uneven
- // replica routing (which is beyond our control).
- // If we removed replicas from non-cold shards we could accidentally bring that shard into
- // the hot range, which would result in adding replica, and that replica could again stay cold due to
- // the same routing issue, which then would lead to removing that replica, etc, etc...
-
- // Remove cold replicas but only when there's at least a minimum number of searchable
- // replicas still available (additional non-searchable replicas may exist, too)
- // NOTE: do this before adding ops for DELETENODE because we don't want to attempt
- // deleting replicas that have been already moved elsewhere
- Map<String, Map<String, List<ReplicaInfo>>> byCollectionByShard = new HashMap<>();
- coldReplicas.forEach(ri -> {
- byCollectionByShard.computeIfAbsent(ri.getCollection(), c -> new HashMap<>())
- .computeIfAbsent(ri.getShard(), s -> new ArrayList<>())
- .add(ri);
- });
- coldShards.forEach((coll, perShard) -> {
- perShard.forEach((shard, rate) -> {
- List<ReplicaInfo> replicas = byCollectionByShard
- .getOrDefault(coll, Collections.emptyMap())
- .getOrDefault(shard, Collections.emptyList());
- if (replicas.isEmpty()) {
- return;
- }
- // only delete if there's at least minRF searchable replicas left
- int rf = searchableReplicationFactors.get(coll).get(shard).get();
- // assume first that we only really need a leader and we may be
- // allowed to remove other replicas
- int minRF = 1;
- // but check the official RF and don't go below that
- Integer RF = clusterState.getCollection(coll).getReplicationFactor();
- if (RF != null) {
- minRF = RF;
- }
- // unless minReplicas is set explicitly
- if (minReplicas != null) {
- minRF = minReplicas;
- }
- if (minRF < 1) {
- minRF = 1;
- }
- if (rf > minRF) {
- // delete at most maxOps replicas at a time
- AtomicInteger limit = new AtomicInteger(Math.min(maxOps, rf - minRF));
- replicas.forEach(ri -> {
- if (limit.get() == 0) {
- return;
- }
- // don't delete a leader
- if (ri.getBool(ZkStateReader.LEADER_PROP, false)) {
- return;
- }
- TriggerEvent.Op op = new TriggerEvent.Op(belowOp,
- Suggester.Hint.COLL_SHARD, new Pair<>(ri.getCollection(), ri.getShard()));
- op.addHint(Suggester.Hint.REPLICA, ri.getName());
- ops.add(op);
- violations.add(COLD_SHARDS);
- limit.decrementAndGet();
- });
- }
- });
- });
-
- // COLD NODES:
- // Unlike the case of hot nodes, if a node is cold then any monitored
- // collections / shards / replicas located on that node are cold, too.
- // HOWEVER, we check only replicas from selected collections / shards,
- // so deleting a cold node is dangerous because it may interfere with these
- // non-monitored resources - this is the reason the default belowNodeOp is null / ignored.
- //
- // Also, note that due to the way activity is measured only nodes that contain any
- // monitored resources are considered - there may be cold nodes in the cluster that don't
- // belong to the monitored collections and they will be ignored.
- if (belowNodeOp != null) {
- coldNodes.forEach((node, rate) -> {
- ops.add(new TriggerEvent.Op(belowNodeOp, Suggester.Hint.SRC_NODE, node));
- violations.add(COLD_NODES);
- });
- }
-
-
- }
-
- private boolean waitForElapsed(String name, long now, Map<String, Long> lastEventMap) {
- Long lastTime = lastEventMap.computeIfAbsent(name, s -> now);
- long elapsed = TimeUnit.SECONDS.convert(now - lastTime, TimeUnit.NANOSECONDS);
- log.trace("name={}, lastTime={}, elapsed={}, waitFor={}", name, lastTime, elapsed, getWaitForSecond());
- if (TimeUnit.SECONDS.convert(now - lastTime, TimeUnit.NANOSECONDS) < getWaitForSecond()) {
- return false;
- }
- return true;
- }
-
- public static class SearchRateEvent extends TriggerEvent {
- public SearchRateEvent(String source, long eventTime, List<Op> ops,
- Map<String, Double> hotNodes,
- Map<String, Double> hotCollections,
- Map<String, Map<String, Double>> hotShards,
- List<ReplicaInfo> hotReplicas,
- Map<String, Double> coldNodes,
- Map<String, Double> coldCollections,
- Map<String, Map<String, Double>> coldShards,
- List<ReplicaInfo> coldReplicas,
- Set<String> violations) {
- super(TriggerEventType.SEARCHRATE, source, eventTime, null);
- properties.put(TriggerEvent.REQUESTED_OPS, ops);
- properties.put(HOT_NODES, hotNodes);
- properties.put(HOT_COLLECTIONS, hotCollections);
- properties.put(HOT_SHARDS, hotShards);
- properties.put(HOT_REPLICAS, hotReplicas);
- properties.put(COLD_NODES, coldNodes);
- properties.put(COLD_COLLECTIONS, coldCollections);
- properties.put(COLD_SHARDS, coldShards);
- properties.put(COLD_REPLICAS, coldReplicas);
- properties.put(VIOLATION_PROP, violations);
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0ae21ad0/solr/core/src/java/org/apache/solr/cloud/autoscaling/SystemLogListener.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/SystemLogListener.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/SystemLogListener.java
deleted file mode 100644
index c6f0e68..0000000
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/SystemLogListener.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.cloud.autoscaling;
-
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.lang.invoke.MethodHandles;
-import java.util.Collection;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.StringJoiner;
-
-import org.apache.solr.client.solrj.SolrRequest;
-import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
-import org.apache.solr.client.solrj.cloud.SolrCloudManager;
-import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventProcessorStage;
-import org.apache.solr.client.solrj.request.UpdateRequest;
-import org.apache.solr.common.SolrException;
-import org.apache.solr.common.SolrInputDocument;
-import org.apache.solr.common.params.CollectionAdminParams;
-import org.apache.solr.common.params.CommonParams;
-import org.apache.solr.common.params.SolrParams;
-import org.apache.solr.common.util.NamedList;
-import org.apache.solr.common.util.Utils;
-import org.apache.solr.core.SolrResourceLoader;
-import org.apache.solr.util.IdUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This listener saves events to the {@link CollectionAdminParams#SYSTEM_COLL} collection.
- * <p>Configuration properties:</p>
- * <ul>
- * <li>collection - optional string, specifies what collection should be used for storing events. Default value
- * is {@link CollectionAdminParams#SYSTEM_COLL}.</li>
- * </ul>
- */
-public class SystemLogListener extends TriggerListenerBase {
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
- public static final String SOURCE_FIELD = "source_s";
- public static final String EVENT_SOURCE_FIELD = "event.source_s";
- public static final String EVENT_TYPE_FIELD = "event.type_s";
- public static final String STAGE_FIELD = "stage_s";
- public static final String ACTION_FIELD = "action_s";
- public static final String MESSAGE_FIELD = "message_t";
- public static final String BEFORE_ACTIONS_FIELD = "before.actions_ss";
- public static final String AFTER_ACTIONS_FIELD = "after.actions_ss";
- public static final String COLLECTIONS_FIELD = "collections_ss";
- public static final String SOURCE = SystemLogListener.class.getSimpleName();
- public static final String DOC_TYPE = "autoscaling_event";
-
- private String collection = CollectionAdminParams.SYSTEM_COLL;
-
- @Override
- public void configure(SolrResourceLoader loader, SolrCloudManager cloudManager, AutoScalingConfig.TriggerListenerConfig config) throws TriggerValidationException {
- super.configure(loader, cloudManager, config);
- collection = (String)config.properties.getOrDefault(CollectionAdminParams.COLLECTION, CollectionAdminParams.SYSTEM_COLL);
- }
-
- @Override
- public void onEvent(TriggerEvent event, TriggerEventProcessorStage stage, String actionName, ActionContext context,
- Throwable error, String message) throws Exception {
- try {
- SolrInputDocument doc = new SolrInputDocument();
- doc.addField(CommonParams.TYPE, DOC_TYPE);
- doc.addField(SOURCE_FIELD, SOURCE);
- doc.addField("id", IdUtils.timeRandomId());
- doc.addField("event.id_s", event.getId());
- doc.addField(EVENT_TYPE_FIELD, event.getEventType().toString());
- doc.addField(EVENT_SOURCE_FIELD, event.getSource());
- doc.addField("event.time_l", event.getEventTime());
- doc.addField("timestamp", new Date());
- addMap("event.property.", doc, event.getProperties());
- doc.addField(STAGE_FIELD, stage.toString());
- if (actionName != null) {
- doc.addField(ACTION_FIELD, actionName);
- }
- if (message != null) {
- doc.addField(MESSAGE_FIELD, message);
- }
- addError(doc, error);
- // add JSON versions of event and context
- String eventJson = Utils.toJSONString(event);
- doc.addField("event_str", eventJson);
- if (context != null) {
- // capture specifics of operations after compute_plan action
- addOperations(doc, (List<SolrRequest>)context.getProperties().get("operations"));
- // capture specifics of responses after execute_plan action
- addResponses(doc, (List<NamedList<Object>>)context.getProperties().get("responses"));
- addActions(BEFORE_ACTIONS_FIELD, doc, (List<String>)context.getProperties().get(TriggerEventProcessorStage.BEFORE_ACTION.toString()));
- addActions(AFTER_ACTIONS_FIELD, doc, (List<String>)context.getProperties().get(TriggerEventProcessorStage.AFTER_ACTION.toString()));
- String contextJson = Utils.toJSONString(context);
- doc.addField("context_str", contextJson);
- }
- UpdateRequest req = new UpdateRequest();
- req.add(doc);
- req.setParam(CollectionAdminParams.COLLECTION, collection);
- cloudManager.request(req);
- } catch (Exception e) {
- if ((e instanceof SolrException) && e.getMessage().contains("Collection not found")) {
- // relatively benign
- log.info("Collection " + collection + " does not exist, disabling logging.");
- enabled = false;
- } else {
- log.warn("Exception sending event to collection " + collection, e);
- }
- }
- }
-
- private void addActions(String field, SolrInputDocument doc, List<String> actions) {
- if (actions == null) {
- return;
- }
- actions.forEach(a -> doc.addField(field, a));
- }
-
- private void addMap(String prefix, SolrInputDocument doc, Map<String, Object> map) {
- map.forEach((k, v) -> {
- if (v instanceof Collection) {
- for (Object o : (Collection)v) {
- doc.addField(prefix + k + "_ss", String.valueOf(o));
- }
- } else {
- doc.addField(prefix + k + "_ss", String.valueOf(v));
- }
- });
- }
-
- private void addOperations(SolrInputDocument doc, List<SolrRequest> operations) {
- if (operations == null || operations.isEmpty()) {
- return;
- }
- Set<String> collections = new HashSet<>();
- for (SolrRequest req : operations) {
- SolrParams params = req.getParams();
- if (params == null) {
- continue;
- }
- if (params.get(CollectionAdminParams.COLLECTION) != null) {
- collections.add(params.get(CollectionAdminParams.COLLECTION));
- }
- // build a whitespace-separated param string
- StringJoiner paramJoiner = new StringJoiner(" ");
- paramJoiner.setEmptyValue("");
- for (Iterator<String> it = params.getParameterNamesIterator(); it.hasNext(); ) {
- final String name = it.next();
- final String [] values = params.getParams(name);
- for (String value : values) {
- paramJoiner.add(name + "=" + value);
- }
- }
- String paramString = paramJoiner.toString();
- if (!paramString.isEmpty()) {
- doc.addField("operations.params_ts", paramString);
- }
- }
- if (!collections.isEmpty()) {
- doc.addField(COLLECTIONS_FIELD, collections);
- }
- }
-
- private void addResponses(SolrInputDocument doc, List<NamedList<Object>> responses) {
- if (responses == null || responses.isEmpty()) {
- return;
- }
- for (NamedList<Object> rsp : responses) {
- Object o = rsp.get("success");
- if (o != null) {
- doc.addField("responses_ts", "success " + o);
- } else {
- o = rsp.get("failure");
- if (o != null) {
- doc.addField("responses_ts", "failure " + o);
- } else { // something else
- doc.addField("responses_ts", Utils.toJSONString(rsp));
- }
- }
- }
- }
-
- private void addError(SolrInputDocument doc, Throwable error) {
- if (error == null) {
- return;
- }
- StringWriter sw = new StringWriter();
- PrintWriter pw = new PrintWriter(sw);
- error.printStackTrace(pw);
- pw.flush(); pw.close();
- doc.addField("error.message_t", error.getMessage());
- doc.addField("error.details_t", sw.toString());
- }
-}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0ae21ad0/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerAction.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerAction.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerAction.java
deleted file mode 100644
index b873ee6..0000000
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerAction.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.cloud.autoscaling;
-
-import java.io.Closeable;
-import java.util.Map;
-
-import org.apache.solr.client.solrj.cloud.SolrCloudManager;
-import org.apache.solr.core.SolrResourceLoader;
-
-/**
- * Interface for actions performed in response to a trigger being activated
- */
-public interface TriggerAction extends Closeable {
-
- /**
- * Called when action is created but before it's initialized and used.
- * This method should also verify that the configuration parameters are correct.
- * It may be called multiple times.
- * @param loader loader to use for instantiating sub-components
- * @param cloudManager current instance of SolrCloudManager
- * @param properties configuration properties
- * @throws TriggerValidationException contains details of invalid configuration parameters.
- */
- void configure(SolrResourceLoader loader, SolrCloudManager cloudManager, Map<String, Object> properties) throws TriggerValidationException;
-
- /**
- * Called before an action is first used. Any heavy object creation or initialization should
- * be done in this method instead of the constructor or {@link #configure(SolrResourceLoader, SolrCloudManager, Map)} method.
- */
- void init() throws Exception;
-
- String getName();
-
- void process(TriggerEvent event, ActionContext context) throws Exception;
-}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0ae21ad0/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerActionBase.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerActionBase.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerActionBase.java
deleted file mode 100644
index 7a9f34b..0000000
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerActionBase.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.cloud.autoscaling;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.solr.client.solrj.cloud.SolrCloudManager;
-import org.apache.solr.core.SolrResourceLoader;
-
-/**
- * Base class for {@link TriggerAction} implementations.
- */
-public abstract class TriggerActionBase implements TriggerAction {
-
- protected Map<String, Object> properties = new HashMap<>();
- protected SolrResourceLoader loader;
- protected SolrCloudManager cloudManager;
- /**
- * Set of valid property names. Subclasses may add to this set
- * using {@link TriggerUtils#validProperties(Set, String...)}
- */
- protected final Set<String> validProperties = new HashSet<>();
- /**
- * Set of required property names. Subclasses may add to this set
- * using {@link TriggerUtils#requiredProperties(Set, Set, String...)}
- * (required properties are also valid properties).
- */
- protected final Set<String> requiredProperties = new HashSet<>();
-
- protected TriggerActionBase() {
- // not strictly needed here because they are already checked during instantiation
- TriggerUtils.validProperties(validProperties, "name", "class");
- }
-
- @Override
- public String getName() {
- String name = (String) properties.get("name");
- if (name != null) {
- return name;
- } else {
- return getClass().getSimpleName();
- }
- }
-
- @Override
- public void close() throws IOException {
-
- }
-
- @Override
- public void configure(SolrResourceLoader loader, SolrCloudManager cloudManager, Map<String, Object> properties) throws TriggerValidationException {
- this.loader = loader;
- this.cloudManager = cloudManager;
- if (properties != null) {
- this.properties.putAll(properties);
- }
- // validate the config
- Map<String, String> results = new HashMap<>();
- TriggerUtils.checkProperties(this.properties, results, requiredProperties, validProperties);
- if (!results.isEmpty()) {
- throw new TriggerValidationException(getName(), results);
- }
- }
-
- @Override
- public void init() throws Exception {
-
- }
-}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0ae21ad0/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerActionException.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerActionException.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerActionException.java
deleted file mode 100644
index 624ce68..0000000
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerActionException.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.cloud.autoscaling;
-
-/**
- * Trigger action-specific exception.
- */
-public class TriggerActionException extends Exception {
-
- public final String triggerName;
- public final String actionName;
-
- public TriggerActionException(String triggerName, String actionName, String message, Throwable cause) {
- super(message, cause);
- this.triggerName = triggerName;
- this.actionName = actionName;
- }
-}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0ae21ad0/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerBase.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerBase.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerBase.java
deleted file mode 100644
index 214552e..0000000
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerBase.java
+++ /dev/null
@@ -1,267 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.cloud.autoscaling;
-
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.lucene.util.IOUtils;
-import org.apache.solr.client.solrj.cloud.autoscaling.AlreadyExistsException;
-import org.apache.solr.client.solrj.cloud.autoscaling.BadVersionException;
-import org.apache.solr.client.solrj.cloud.DistribStateManager;
-import org.apache.solr.client.solrj.cloud.SolrCloudManager;
-import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
-
-import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData;
-import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.util.Utils;
-import org.apache.solr.core.SolrResourceLoader;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Base class for {@link org.apache.solr.cloud.autoscaling.AutoScaling.Trigger} implementations.
- * It handles state snapshot / restore in ZK.
- */
-public abstract class TriggerBase implements AutoScaling.Trigger {
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
- protected final String name;
- protected SolrCloudManager cloudManager;
- protected SolrResourceLoader loader;
- protected DistribStateManager stateManager;
- protected final Map<String, Object> properties = new HashMap<>();
- /**
- * Set of valid property names. Subclasses may add to this set
- * using {@link TriggerUtils#validProperties(Set, String...)}
- */
- protected final Set<String> validProperties = new HashSet<>();
- /**
- * Set of required property names. Subclasses may add to this set
- * using {@link TriggerUtils#requiredProperties(Set, Set, String...)}
- * (required properties are also valid properties).
- */
- protected final Set<String> requiredProperties = new HashSet<>();
- protected final TriggerEventType eventType;
- protected int waitForSecond;
- protected Map<String,Object> lastState;
- protected final AtomicReference<AutoScaling.TriggerEventProcessor> processorRef = new AtomicReference<>();
- protected List<TriggerAction> actions;
- protected boolean enabled;
- protected boolean isClosed;
-
-
- protected TriggerBase(TriggerEventType eventType, String name) {
- this.eventType = eventType;
- this.name = name;
-
- // subclasses may modify this set to include other supported properties
- TriggerUtils.validProperties(validProperties, "name", "class", "event", "enabled", "waitFor", "actions");
- }
-
- @Override
- public void configure(SolrResourceLoader loader, SolrCloudManager cloudManager, Map<String, Object> properties) throws TriggerValidationException {
- this.cloudManager = cloudManager;
- this.loader = loader;
- this.stateManager = cloudManager.getDistribStateManager();
- if (properties != null) {
- this.properties.putAll(properties);
- }
- this.enabled = Boolean.parseBoolean(String.valueOf(this.properties.getOrDefault("enabled", "true")));
- this.waitForSecond = ((Number) this.properties.getOrDefault("waitFor", -1L)).intValue();
- List<Map<String, Object>> o = (List<Map<String, Object>>) properties.get("actions");
- if (o != null && !o.isEmpty()) {
- actions = new ArrayList<>(3);
- for (Map<String, Object> map : o) {
- TriggerAction action = null;
- try {
- action = loader.newInstance((String)map.get("class"), TriggerAction.class);
- } catch (Exception e) {
- throw new TriggerValidationException("action", "exception creating action " + map + ": " + e.toString());
- }
- action.configure(loader, cloudManager, map);
- actions.add(action);
- }
- } else {
- actions = Collections.emptyList();
- }
-
-
- Map<String, String> results = new HashMap<>();
- TriggerUtils.checkProperties(this.properties, results, requiredProperties, validProperties);
- if (!results.isEmpty()) {
- throw new TriggerValidationException(name, results);
- }
- }
-
- @Override
- public void init() throws Exception {
- try {
- if (!stateManager.hasData(ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH)) {
- stateManager.makePath(ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH);
- }
- } catch (AlreadyExistsException e) {
- // ignore
- } catch (InterruptedException | KeeperException | IOException e) {
- log.warn("Exception checking ZK path " + ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH, e);
- throw e;
- }
- for (TriggerAction action : actions) {
- action.init();
- }
- }
-
- @Override
- public void setProcessor(AutoScaling.TriggerEventProcessor processor) {
- processorRef.set(processor);
- }
-
- @Override
- public AutoScaling.TriggerEventProcessor getProcessor() {
- return processorRef.get();
- }
-
- @Override
- public String getName() {
- return name;
- }
-
- @Override
- public TriggerEventType getEventType() {
- return eventType;
- }
-
- @Override
- public boolean isEnabled() {
- return enabled;
- }
-
- @Override
- public int getWaitForSecond() {
- return waitForSecond;
- }
-
- @Override
- public Map<String, Object> getProperties() {
- return properties;
- }
-
- @Override
- public List<TriggerAction> getActions() {
- return actions;
- }
-
- @Override
- public boolean isClosed() {
- synchronized (this) {
- return isClosed;
- }
- }
-
- @Override
- public void close() throws IOException {
- synchronized (this) {
- isClosed = true;
- IOUtils.closeWhileHandlingException(actions);
- }
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(name, properties);
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj == null) {
- return false;
- }
- if (obj.getClass().equals(this.getClass())) {
- TriggerBase that = (TriggerBase) obj;
- return this.name.equals(that.name)
- && this.properties.equals(that.properties);
- }
- return false;
- }
-
- /**
- * Prepare and return internal state of this trigger in a format suitable for persisting in ZK.
- * @return map of internal state properties. Note: values must be supported by {@link Utils#toJSON(Object)}.
- */
- protected abstract Map<String,Object> getState();
-
- /**
- * Restore internal state of this trigger from properties retrieved from ZK.
- * @param state never null but may be empty.
- */
- protected abstract void setState(Map<String,Object> state);
-
- @Override
- public void saveState() {
- Map<String,Object> state = Utils.getDeepCopy(getState(), 10, false, true);
- if (lastState != null && lastState.equals(state)) {
- // skip saving if identical
- return;
- }
- byte[] data = Utils.toJSON(state);
- String path = ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH + "/" + getName();
- try {
- if (stateManager.hasData(path)) {
- // update
- stateManager.setData(path, data, -1);
- } else {
- // create
- stateManager.createData(path, data, CreateMode.PERSISTENT);
- }
- lastState = state;
- } catch (InterruptedException | BadVersionException | AlreadyExistsException | IOException | KeeperException e) {
- log.warn("Exception updating trigger state '" + path + "'", e);
- }
- }
-
- @Override
- public void restoreState() {
- byte[] data = null;
- String path = ZkStateReader.SOLR_AUTOSCALING_TRIGGER_STATE_PATH + "/" + getName();
- try {
- if (stateManager.hasData(path)) {
- VersionedData versionedData = stateManager.getData(path);
- data = versionedData.getData();
- }
- } catch (Exception e) {
- log.warn("Exception getting trigger state '" + path + "'", e);
- }
- if (data != null) {
- Map<String, Object> restoredState = (Map<String, Object>)Utils.fromJSON(data);
- // make sure lastState is sorted
- restoredState = Utils.getDeepCopy(restoredState, 10, false, true);
- setState(restoredState);
- lastState = restoredState;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0ae21ad0/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEvent.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEvent.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEvent.java
deleted file mode 100644
index 8e3a348..0000000
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEvent.java
+++ /dev/null
@@ -1,309 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.cloud.autoscaling;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.EnumMap;
-import java.util.HashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.solr.client.solrj.cloud.autoscaling.Suggester;
-import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
-import org.apache.solr.common.MapWriter;
-import org.apache.solr.common.params.CollectionParams;
-import org.apache.solr.common.util.Pair;
-import org.apache.solr.common.util.Utils;
-import org.apache.solr.util.IdUtils;
-
-/**
- * Trigger event.
- */
-public class TriggerEvent implements MapWriter {
- public static final String IGNORED = "ignored";
- public static final String COOLDOWN = "cooldown";
- public static final String REPLAYING = "replaying";
- public static final String NODE_NAMES = "nodeNames";
- public static final String EVENT_TIMES = "eventTimes";
- public static final String REQUESTED_OPS = "requestedOps";
- public static final String UNSUPPORTED_OPS = "unsupportedOps";
-
- public static final class Op implements MapWriter {
- private final CollectionParams.CollectionAction action;
- private final EnumMap<Suggester.Hint, Object> hints = new EnumMap<>(Suggester.Hint.class);
-
- public Op(CollectionParams.CollectionAction action) {
- this.action = action;
- }
-
- public Op(CollectionParams.CollectionAction action, Suggester.Hint hint, Object hintValue) {
- this.action = action;
- addHint(hint, hintValue);
- }
-
- public void addHint(Suggester.Hint hint, Object value) {
- hint.validator.accept(value);
- if (hint.multiValued) {
- Collection<?> values = value instanceof Collection ? (Collection) value : Collections.singletonList(value);
- ((Set) hints.computeIfAbsent(hint, h -> new LinkedHashSet<>())).addAll(values);
- } else {
- hints.put(hint, value == null ? null : String.valueOf(value));
- }
- }
-
- public CollectionParams.CollectionAction getAction() {
- return action;
- }
-
- public EnumMap<Suggester.Hint, Object> getHints() {
- return hints;
- }
-
- @Override
- public void writeMap(EntryWriter ew) throws IOException {
- ew.put("action", action);
- ew.put("hints", hints);
- }
-
- public static Op fromMap(Map<String, Object> map) {
- if (!map.containsKey("action")) {
- return null;
- }
- CollectionParams.CollectionAction action = CollectionParams.CollectionAction.get(String.valueOf(map.get("action")));
- if (action == null) {
- return null;
- }
- Op op = new Op(action);
- Map<Object, Object> hints = (Map<Object, Object>)map.get("hints");
- if (hints != null && !hints.isEmpty()) {
- hints.forEach((k, v) -> {
- Suggester.Hint h = Suggester.Hint.get(k.toString());
- if (h == null) {
- return;
- }
- if (!(v instanceof Collection)) {
- v = Collections.singletonList(v);
- }
- ((Collection)v).forEach(vv -> {
- if (vv instanceof Map) {
- // maybe it's a Pair?
- Map<String, Object> m = (Map<String, Object>)vv;
- if (m.containsKey("first") && m.containsKey("second")) {
- Pair p = Pair.parse(m);
- if (p != null) {
- op.addHint(h, p);
- return;
- }
- }
- }
- op.addHint(h, vv);
- });
- });
- }
- return op;
- }
-
- @Override
- public String toString() {
- return "Op{" +
- "action=" + action +
- ", hints=" + hints +
- '}';
- }
- }
-
- protected final String id;
- protected final String source;
- protected final long eventTime;
- protected final TriggerEventType eventType;
- protected final Map<String, Object> properties = new HashMap<>();
- protected final boolean ignored;
-
- public TriggerEvent(TriggerEventType eventType, String source, long eventTime,
- Map<String, Object> properties) {
- this(IdUtils.timeRandomId(eventTime), eventType, source, eventTime, properties, false);
- }
-
- public TriggerEvent(TriggerEventType eventType, String source, long eventTime,
- Map<String, Object> properties, boolean ignored) {
- this(IdUtils.timeRandomId(eventTime), eventType, source, eventTime, properties, ignored);
- }
-
- public TriggerEvent(String id, TriggerEventType eventType, String source, long eventTime,
- Map<String, Object> properties) {
- this(id, eventType, source, eventTime, properties, false);
- }
-
- public TriggerEvent(String id, TriggerEventType eventType, String source, long eventTime,
- Map<String, Object> properties, boolean ignored) {
- this.id = id;
- this.eventType = eventType;
- this.source = source;
- this.eventTime = eventTime;
- if (properties != null) {
- this.properties.putAll(properties);
- }
- this.ignored = ignored;
- }
-
- /**
- * Unique event id.
- */
- public String getId() {
- return id;
- }
-
- /**
- * Name of the trigger that fired the event.
- */
- public String getSource() {
- return source;
- }
-
- /**
- * Timestamp of the actual event, in nanoseconds.
- * NOTE: this is NOT the timestamp when the event was fired - events may be fired
- * much later than the actual condition that generated the event, due to the "waitFor" limit.
- */
- public long getEventTime() {
- return eventTime;
- }
-
- /**
- * Get event properties (modifiable).
- */
- public Map<String, Object> getProperties() {
- return properties;
- }
-
- /**
- * Get a named event property or null if missing.
- */
- public Object getProperty(String name) {
- return properties.get(name);
- }
-
- /**
- * Get a named event property or default value if missing.
- */
- public Object getProperty(String name, Object defaultValue) {
- Object v = properties.get(name);
- if (v == null) {
- return defaultValue;
- } else {
- return v;
- }
- }
-
- /**
- * Event type.
- */
- public TriggerEventType getEventType() {
- return eventType;
- }
-
- public boolean isIgnored() {
- return ignored;
- }
-
- /**
- * Set event properties.
- *
- * @param properties may be null. A shallow copy of this parameter is used.
- */
- public void setProperties(Map<String, Object> properties) {
- this.properties.clear();
- if (properties != null) {
- this.properties.putAll(properties);
- }
- }
-
- @Override
- public void writeMap(EntryWriter ew) throws IOException {
- ew.put("id", id);
- ew.put("source", source);
- ew.put("eventTime", eventTime);
- ew.put("eventType", eventType.toString());
- ew.put("properties", properties);
- if (ignored) {
- ew.put("ignored", true);
- }
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- TriggerEvent that = (TriggerEvent) o;
-
- if (eventTime != that.eventTime) return false;
- if (!id.equals(that.id)) return false;
- if (!source.equals(that.source)) return false;
- if (eventType != that.eventType) return false;
- if (ignored != that.ignored) return false;
- return properties.equals(that.properties);
- }
-
- @Override
- public int hashCode() {
- int result = id.hashCode();
- result = 31 * result + source.hashCode();
- result = 31 * result + (int) (eventTime ^ (eventTime >>> 32));
- result = 31 * result + eventType.hashCode();
- result = 31 * result + properties.hashCode();
- result = 31 * result + Boolean.hashCode(ignored);
- return result;
- }
-
- @Override
- public String toString() {
- return Utils.toJSONString(this);
- }
-
- public static TriggerEvent fromMap(Map<String, Object> map) {
- String id = (String)map.get("id");
- String source = (String)map.get("source");
- long eventTime = ((Number)map.get("eventTime")).longValue();
- TriggerEventType eventType = TriggerEventType.valueOf((String)map.get("eventType"));
- Map<String, Object> properties = (Map<String, Object>)map.get("properties");
- // properly deserialize some well-known complex properties
- fixOps(TriggerEvent.REQUESTED_OPS, properties);
- fixOps(TriggerEvent.UNSUPPORTED_OPS, properties);
- TriggerEvent res = new TriggerEvent(id, eventType, source, eventTime, properties);
- return res;
- }
-
- public static void fixOps(String type, Map<String, Object> properties) {
- List<Object> ops = (List<Object>)properties.get(type);
- if (ops != null && !ops.isEmpty()) {
- for (int i = 0; i < ops.size(); i++) {
- Object o = ops.get(i);
- if (o instanceof Map) {
- TriggerEvent.Op op = TriggerEvent.Op.fromMap((Map)o);
- if (op != null) {
- ops.set(i, op);
- }
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0ae21ad0/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEventQueue.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEventQueue.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEventQueue.java
deleted file mode 100644
index 057d792..0000000
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerEventQueue.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.cloud.autoscaling;
-
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.nio.charset.StandardCharsets;
-import java.util.Map;
-
-import org.apache.solr.client.solrj.cloud.DistributedQueue;
-import org.apache.solr.client.solrj.cloud.SolrCloudManager;
-import org.apache.solr.cloud.Stats;
-import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.util.Utils;
-import org.apache.solr.common.util.TimeSource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- *
- */
-public class TriggerEventQueue {
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
- public static final String ENQUEUE_TIME = "_enqueue_time_";
- public static final String DEQUEUE_TIME = "_dequeue_time_";
-
- private final String triggerName;
- private final TimeSource timeSource;
- private final DistributedQueue delegate;
-
- public TriggerEventQueue(SolrCloudManager cloudManager, String triggerName, Stats stats) throws IOException {
- // TODO: collect stats
- this.delegate = cloudManager.getDistributedQueueFactory().makeQueue(ZkStateReader.SOLR_AUTOSCALING_EVENTS_PATH + "/" + triggerName);
- this.triggerName = triggerName;
- this.timeSource = cloudManager.getTimeSource();
- }
-
- public boolean offerEvent(TriggerEvent event) {
- event.getProperties().put(ENQUEUE_TIME, timeSource.getTimeNs());
- try {
- byte[] data = Utils.toJSON(event);
- delegate.offer(data);
- return true;
- } catch (Exception e) {
- log.warn("Exception adding event " + event + " to queue " + triggerName, e);
- return false;
- }
- }
-
- public TriggerEvent peekEvent() {
- byte[] data;
- try {
- while ((data = delegate.peek()) != null) {
- if (data.length == 0) {
- log.warn("ignoring empty data...");
- continue;
- }
- try {
- Map<String, Object> map = (Map<String, Object>) Utils.fromJSON(data);
- return fromMap(map);
- } catch (Exception e) {
- log.warn("Invalid event data, ignoring: " + new String(data, StandardCharsets.UTF_8));
- continue;
- }
- }
- } catch (Exception e) {
- log.warn("Exception peeking queue of trigger " + triggerName, e);
- }
- return null;
- }
-
- public TriggerEvent pollEvent() {
- byte[] data;
- try {
- while ((data = delegate.poll()) != null) {
- if (data.length == 0) {
- log.warn("ignoring empty data...");
- continue;
- }
- try {
- Map<String, Object> map = (Map<String, Object>) Utils.fromJSON(data);
- return fromMap(map);
- } catch (Exception e) {
- log.warn("Invalid event data, ignoring: " + new String(data, StandardCharsets.UTF_8));
- continue;
- }
- }
- } catch (Exception e) {
- log.warn("Exception polling queue of trigger " + triggerName, e);
- }
- return null;
- }
-
- private TriggerEvent fromMap(Map<String, Object> map) {
- TriggerEvent res = TriggerEvent.fromMap(map);
- res.getProperties().put(DEQUEUE_TIME, timeSource.getTimeNs());
- return res;
- }
-}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0ae21ad0/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerListener.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerListener.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerListener.java
deleted file mode 100644
index 234387f..0000000
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerListener.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.cloud.autoscaling;
-
-import java.io.Closeable;
-
-import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
-import org.apache.solr.client.solrj.cloud.SolrCloudManager;
-import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventProcessorStage;
-import org.apache.solr.core.SolrResourceLoader;
-
-/**
- * Implementations of this interface are notified of stages in event processing that they were
- * registered for. Note: instances may be closed and re-created on each auto-scaling config update.
- */
-public interface TriggerListener extends Closeable {
-
- /**
- * Called when listener is created but before it's initialized and used.
- * This method should also verify that the configuration parameters are correct.
- * It may be called multiple times.
- * @param loader loader to use for instantiating sub-components
- * @param cloudManager current instance of SolrCloudManager
- * @param config coniguration
- * @throws TriggerValidationException contains details of invalid configuration parameters.
- */
- void configure(SolrResourceLoader loader, SolrCloudManager cloudManager, AutoScalingConfig.TriggerListenerConfig config) throws TriggerValidationException;
-
- /**
- * If this method returns false then the listener's {@link #onEvent(TriggerEvent, TriggerEventProcessorStage, String, ActionContext, Throwable, String)}
- * method should not be called.
- */
- boolean isEnabled();
-
- void init() throws Exception;
-
- AutoScalingConfig.TriggerListenerConfig getConfig();
-
- /**
- * This method is called when either a particular <code>stage</code> or
- * <code>actionName</code> is reached during event processing.
- * @param event current event being processed
- * @param stage {@link TriggerEventProcessorStage} that this listener was registered for, or null
- * @param actionName {@link TriggerAction} name that this listener was registered for, or null
- * @param context optional {@link ActionContext} when the processing stage is related to an action, or null
- * @param error optional {@link Throwable} error, or null
- * @param message optional message
- */
- void onEvent(TriggerEvent event, TriggerEventProcessorStage stage, String actionName, ActionContext context,
- Throwable error, String message) throws Exception;
-}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0ae21ad0/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerListenerBase.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerListenerBase.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerListenerBase.java
deleted file mode 100644
index 7a323c7..0000000
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerListenerBase.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.cloud.autoscaling;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
-import org.apache.solr.client.solrj.cloud.SolrCloudManager;
-import org.apache.solr.core.SolrResourceLoader;
-
-/**
- * Base class for implementations of {@link TriggerListener}.
- */
-public abstract class TriggerListenerBase implements TriggerListener {
-
- protected AutoScalingConfig.TriggerListenerConfig config;
- protected SolrCloudManager cloudManager;
- protected SolrResourceLoader loader;
- protected boolean enabled;
- /**
- * Set of valid property names. Subclasses may add to this set
- * using {@link TriggerUtils#validProperties(Set, String...)}
- */
- protected final Set<String> validProperties = new HashSet<>();
- /**
- * Set of required property names. Subclasses may add to this set
- * using {@link TriggerUtils#requiredProperties(Set, Set, String...)}
- * (required properties are also valid properties).
- */
- protected final Set<String> requiredProperties = new HashSet<>();
- /**
- * Subclasses can add to this set if they want to allow arbitrary properties that
- * start with one of valid prefixes.
- */
- protected final Set<String> validPropertyPrefixes = new HashSet<>();
-
- protected TriggerListenerBase() {
- TriggerUtils.requiredProperties(requiredProperties, validProperties, "trigger");
- TriggerUtils.validProperties(validProperties, "name", "class", "stage", "beforeAction", "afterAction", "enabled");
- }
-
- @Override
- public void configure(SolrResourceLoader loader, SolrCloudManager cloudManager, AutoScalingConfig.TriggerListenerConfig config) throws TriggerValidationException {
- this.loader = loader;
- this.cloudManager = cloudManager;
- this.config = config;
- this.enabled = Boolean.parseBoolean(String.valueOf(config.properties.getOrDefault("enabled", true)));
- // validate the config
- Map<String, String> results = new HashMap<>();
- // prepare a copy to treat the prefix-based properties
- Map<String, Object> propsToCheck = new HashMap<>(config.properties);
- propsToCheck.keySet().removeIf(k ->
- validPropertyPrefixes.stream().anyMatch(p -> k.startsWith(p)));
- TriggerUtils.checkProperties(propsToCheck, results, requiredProperties, validProperties);
- if (!results.isEmpty()) {
- throw new TriggerValidationException(config.name, results);
- }
- }
-
- @Override
- public AutoScalingConfig.TriggerListenerConfig getConfig() {
- return config;
- }
-
- @Override
- public boolean isEnabled() {
- return enabled;
- }
-
- @Override
- public void init() throws Exception {
-
- }
-
- @Override
- public void close() throws IOException {
-
- }
-}
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0ae21ad0/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerUtils.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerUtils.java b/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerUtils.java
deleted file mode 100644
index 71a1ce4..0000000
--- a/solr/core/src/java/org/apache/solr/cloud/autoscaling/TriggerUtils.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.solr.cloud.autoscaling;
-
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-/**
- *
- */
-public class TriggerUtils {
- // validation helper methods
-
- public static void requiredProperties(Set<String> required, Set<String> valid, String... propertyNames) {
- required.addAll(Arrays.asList(propertyNames));
- valid.addAll(Arrays.asList(propertyNames));
- }
-
- public static void validProperties(Set<String> valid, String... propertyNames) {
- valid.addAll(Arrays.asList(propertyNames));
- }
-
- public static void checkProperties(Map<String, Object> properties, Map<String, String> results, Set<String> required, Set<String> valid) {
- checkValidPropertyNames(properties, results, valid);
- checkRequiredPropertyNames(properties, results, required);
- }
-
- public static void checkValidPropertyNames(Map<String, Object> properties, Map<String, String> results, Set<String> valid) {
- Set<String> currentNames = new HashSet<>(properties.keySet());
- currentNames.removeAll(valid);
- if (!currentNames.isEmpty()) {
- for (String name : currentNames) {
- results.put(name, "unknown property");
- }
- }
- }
-
- public static void checkRequiredPropertyNames(Map<String, Object> properties, Map<String, String> results, Set<String> required) {
- Set<String> requiredNames = new HashSet<>(required);
- requiredNames.removeAll(properties.keySet());
- if (!requiredNames.isEmpty()) {
- for (String name : requiredNames) {
- results.put(name, "missing required property");
- }
- }
- }
-
- public static void checkProperty(Map<String, Object> properties, Map<String, String> results, String name, boolean required, Class... acceptClasses) {
- Object value = properties.get(name);
- if (value == null) {
- if (required) {
- results.put(name, "missing required value");
- } else {
- return;
- }
- }
- if (acceptClasses == null || acceptClasses.length == 0) {
- return;
- }
- boolean accepted = false;
- for (Class clz : acceptClasses) {
- if (clz.isAssignableFrom(value.getClass())) {
- accepted = true;
- break;
- }
- }
- if (!accepted) {
- results.put(name, "value is not an expected type");
- }
- }
-}