You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by da...@apache.org on 2017/06/28 08:37:28 UTC
[2/2] lucene-solr:feature/autoscaling: Merge branch
'feature/autoscaling' of https://git-wip-us.apache.org/repos/asf/lucene-solr
into feature/autoscaling
Merge branch 'feature/autoscaling' of https://git-wip-us.apache.org/repos/asf/lucene-solr into feature/autoscaling
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/00ad44c7
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/00ad44c7
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/00ad44c7
Branch: refs/heads/feature/autoscaling
Commit: 00ad44c7a2473ad049aaa37e748557ff2874a27d
Parents: 5232e36 39c6fb2
Author: Cao Manh Dat <da...@apache.org>
Authored: Wed Jun 28 15:37:12 2017 +0700
Committer: Cao Manh Dat <da...@apache.org>
Committed: Wed Jun 28 15:37:12 2017 +0700
----------------------------------------------------------------------
.../lucene/spatial-extras/spatial-extras.iml | 5 +-
lucene/CHANGES.txt | 8 +
.../byTask/feeds/SpatialFileQueryMaker.java | 15 +-
.../org/apache/lucene/analysis/Analyzer.java | 4 +-
.../org/apache/lucene/search/DoubleValues.java | 21 +
.../lucene/search/DoubleValuesSource.java | 216 ++--
.../apache/lucene/search/LongValuesSource.java | 82 +-
.../lucene/search/TestDoubleValuesSource.java | 16 +-
.../facet/range/TestRangeFacetCounts.java | 75 +-
.../queries/function/FunctionScoreQuery.java | 9 -
.../lucene/queries/function/ValueSource.java | 260 +++--
.../function/TestFunctionScoreExplanations.java | 8 +-
.../function/TestFunctionScoreQuery.java | 91 +-
lucene/spatial-extras/build.xml | 8 +-
.../org/apache/lucene/spatial/ShapeValues.java | 41 +
.../lucene/spatial/ShapeValuesSource.java | 34 +
.../apache/lucene/spatial/SpatialStrategy.java | 19 +-
.../bbox/BBoxOverlapRatioValueSource.java | 7 +-
.../spatial/bbox/BBoxSimilarityValueSource.java | 79 +-
.../lucene/spatial/bbox/BBoxStrategy.java | 13 +-
.../lucene/spatial/bbox/BBoxValueSource.java | 74 +-
.../composite/CompositeSpatialStrategy.java | 14 +-
.../spatial/composite/CompositeVerifyQuery.java | 32 +-
.../composite/IntersectsRPTVerifyQuery.java | 21 +-
.../prefix/NumberRangePrefixTreeStrategy.java | 4 +-
.../spatial/prefix/PrefixTreeStrategy.java | 4 +-
.../serialized/SerializedDVStrategy.java | 110 +--
.../spatial/util/CachingDoubleValueSource.java | 61 +-
.../util/DistanceToShapeValueSource.java | 68 +-
.../util/ReciprocalDoubleValuesSource.java | 96 ++
.../spatial/util/ShapeAreaValueSource.java | 67 +-
.../ShapeFieldCacheDistanceValueSource.java | 59 +-
.../spatial/util/ShapePredicateValueSource.java | 113 ---
.../spatial/util/ShapeValuesPredicate.java | 99 ++
.../spatial/vector/DistanceValueSource.java | 72 +-
.../spatial/vector/PointVectorStrategy.java | 95 +-
.../lucene/spatial/DistanceStrategyTest.java | 6 -
.../apache/lucene/spatial/SpatialExample.java | 12 +-
.../apache/lucene/spatial/StrategyTestCase.java | 39 +-
.../lucene/spatial/spatial4j/Geo3dRptTest.java | 2 +-
.../Geo3dShapeRectRelationTestCase.java | 2 +-
.../DocumentValueSourceDictionaryTest.java | 15 +
solr/CHANGES.txt | 10 +-
solr/bin/solr | 13 +-
.../ltr/model/MultipleAdditiveTreesModel.java | 16 +-
.../src/java/org/apache/solr/cloud/Assign.java | 6 +-
.../cloud/OverseerCollectionMessageHandler.java | 5 +-
.../org/apache/solr/cloud/ZkController.java | 19 +-
.../cloud/autoscaling/AutoScalingConfig.java | 1 +
.../cloud/autoscaling/AutoScalingHandler.java | 13 +-
.../cloud/autoscaling/ComputePlanAction.java | 1 +
.../org/apache/solr/core/CoreContainer.java | 2 +-
.../solr/handler/admin/CollectionsHandler.java | 2 +-
.../org/apache/solr/legacy/BBoxStrategy.java | 13 +-
.../org/apache/solr/legacy/BBoxValueSource.java | 82 +-
.../apache/solr/legacy/DistanceValueSource.java | 81 +-
.../apache/solr/legacy/PointVectorStrategy.java | 17 +-
.../transform/GeoTransformerFactory.java | 33 +-
.../solr/schema/AbstractSpatialFieldType.java | 10 +-
.../java/org/apache/solr/schema/BBoxField.java | 8 +-
.../solr/schema/LatLonPointSpatialField.java | 46 +-
.../schema/RptWithGeometrySpatialField.java | 60 +-
.../apache/solr/schema/ZkIndexSchemaReader.java | 105 +-
.../distance/GeoDistValueSourceParser.java | 8 +-
.../configsets/_default/conf/solrconfig.xml | 2 +-
.../solr/cloud/CollectionsAPISolrJTest.java | 2 +-
.../autoscaling/AutoScalingHandlerTest.java | 1 +
.../apache/solr/schema/SchemaWatcherTest.java | 56 ++
.../apache/solr/search/TestSolr4Spatial.java | 10 +-
solr/solr-ref-guide/src/graph-traversal.adoc | 296 +++---
solr/solr-ref-guide/src/stream-sources.adoc | 4 +-
.../cloud/autoscaling/AddReplicaSuggester.java | 73 ++
.../client/solrj/cloud/autoscaling/Cell.java | 69 ++
.../client/solrj/cloud/autoscaling/Clause.java | 479 +++++++++
.../cloud/autoscaling/ClusterDataProvider.java | 52 +
.../cloud/autoscaling/MoveReplicaSuggester.java | 83 ++
.../client/solrj/cloud/autoscaling/Operand.java | 124 +++
.../client/solrj/cloud/autoscaling/Policy.java | 504 ++++++++++
.../solrj/cloud/autoscaling/PolicyHelper.java | 96 ++
.../solrj/cloud/autoscaling/Preference.java | 92 ++
.../solrj/cloud/autoscaling/ReplicaCount.java | 92 ++
.../solrj/cloud/autoscaling/ReplicaInfo.java | 64 ++
.../client/solrj/cloud/autoscaling/Row.java | 139 +++
.../solrj/cloud/autoscaling/package-info.java | 23 +
.../solrj/impl/SolrClientDataProvider.java | 4 +-
.../solrj/request/CollectionAdminRequest.java | 2 +-
.../cloud/autoscaling/AddReplicaSuggester.java | 73 --
.../org/apache/solr/cloud/autoscaling/Cell.java | 57 --
.../apache/solr/cloud/autoscaling/Clause.java | 467 ---------
.../cloud/autoscaling/ClusterDataProvider.java | 52 -
.../cloud/autoscaling/MoveReplicaSuggester.java | 83 --
.../apache/solr/cloud/autoscaling/Operand.java | 124 ---
.../apache/solr/cloud/autoscaling/Policy.java | 502 ----------
.../solr/cloud/autoscaling/PolicyHelper.java | 96 --
.../solr/cloud/autoscaling/Preference.java | 88 --
.../solr/cloud/autoscaling/ReplicaCount.java | 92 --
.../solr/cloud/autoscaling/ReplicaInfo.java | 64 --
.../org/apache/solr/cloud/autoscaling/Row.java | 135 ---
.../solr/cloud/autoscaling/package-info.java | 23 -
.../apache/solr/common/cloud/DocCollection.java | 2 +-
.../solrj/cloud/autoscaling/TestPolicy.java | 973 +++++++++++++++++++
.../solr/cloud/autoscaling/TestPolicy.java | 972 ------------------
102 files changed, 4551 insertions(+), 4146 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/00ad44c7/solr/core/src/java/org/apache/solr/cloud/autoscaling/ComputePlanAction.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/00ad44c7/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java
----------------------------------------------------------------------
diff --cc solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java
index 0000000,451c514..c050341
mode 000000,100644..100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java
@@@ -1,0 -1,499 +1,504 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.solr.client.solrj.cloud.autoscaling;
+
+ import java.io.IOException;
+ import java.util.ArrayList;
+ import java.util.Arrays;
+ import java.util.Collection;
+ import java.util.Collections;
+ import java.util.EnumMap;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.LinkedHashMap;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Objects;
+ import java.util.Set;
+ import java.util.SortedSet;
+ import java.util.TreeSet;
+ import java.util.function.Supplier;
+ import java.util.stream.Collectors;
+
+ import org.apache.solr.client.solrj.SolrRequest;
+ import org.apache.solr.client.solrj.cloud.autoscaling.Clause.Violation;
+ import org.apache.solr.common.IteratorWriter;
+ import org.apache.solr.common.MapWriter;
+ import org.apache.solr.common.params.CollectionParams.CollectionAction;
+ import org.apache.solr.common.util.Pair;
+ import org.apache.solr.common.util.StrUtils;
+ import org.apache.solr.common.util.Utils;
+
+ import static java.util.Collections.emptyList;
+ import static java.util.Collections.emptyMap;
+ import static java.util.stream.Collectors.toList;
+
+ /*The class that reads, parses and applies policies specified in
+ * autoscaling.json
+ *
+ * Create one instance of this class per unique autoscaling.json.
+ * This is immutable and is thread-safe
+ *
+ * Create a fresh new session for each use
+ *
+ */
+ public class Policy implements MapWriter {
+ public static final String POLICY = "policy";
+ public static final String EACH = "#EACH";
+ public static final String ANY = "#ANY";
+ public static final String CLUSTER_POLICY = "cluster-policy";
+ public static final String CLUSTER_PREFERENCE = "cluster-preferences";
+ public static final Set<String> GLOBAL_ONLY_TAGS = Collections.singleton("cores");
+ final Map<String, List<Clause>> policies = new HashMap<>();
+ final List<Clause> clusterPolicy;
+ final List<Preference> clusterPreferences;
+ final List<String> params;
+
+
+ public Policy(Map<String, Object> jsonMap) {
+
+ clusterPreferences = ((List<Map<String, Object>>) jsonMap.getOrDefault(CLUSTER_PREFERENCE, emptyList())).stream()
+ .map(Preference::new)
+ .collect(toList());
+ for (int i = 0; i < clusterPreferences.size() - 1; i++) {
+ Preference preference = clusterPreferences.get(i);
+ preference.next = clusterPreferences.get(i + 1);
+ }
+ if (clusterPreferences.isEmpty()) {
+ clusterPreferences.add(new Preference((Map<String, Object>) Utils.fromJSONString("{minimize : cores, precision:1}")));
+ }
+ SortedSet<String> paramsOfInterest = new TreeSet<>();
+ for (Preference preference : clusterPreferences) {
+ if (paramsOfInterest.contains(preference.name.name())) {
+ throw new RuntimeException(preference.name + " is repeated");
+ }
+ paramsOfInterest.add(preference.name.toString());
+ }
+ this.params = new ArrayList<>(paramsOfInterest);
+
+ clusterPolicy = ((List<Map<String, Object>>) jsonMap.getOrDefault(CLUSTER_POLICY, emptyList())).stream()
+ .map(Clause::new)
+ .filter(clause -> {
+ clause.addTags(params);
+ return true;
+ })
+ .collect(Collectors.toList());
+
+ ((Map<String, List<Map<String, Object>>>) jsonMap.getOrDefault("policies", emptyMap())).forEach((s, l1) ->
+ this.policies.put(s, l1.stream()
+ .map(Clause::new)
+ .filter(clause -> {
+ if (!clause.isPerCollectiontag())
+ throw new RuntimeException(clause.globalTag.name + " is only allowed in 'cluster-policy'");
+ clause.addTags(params);
+ return true;
+ })
+ .sorted()
+ .collect(toList())));
+ }
+
+ public List<Clause> getClusterPolicy() {
+ return clusterPolicy;
+ }
+
+ public List<Preference> getClusterPreferences() {
+ return clusterPreferences;
+ }
+
+ @Override
+ public void writeMap(EntryWriter ew) throws IOException {
+ if (!policies.isEmpty()) {
+ ew.put("policies", (MapWriter) ew1 -> {
+ for (Map.Entry<String, List<Clause>> e : policies.entrySet()) {
+ ew1.put(e.getKey(), e.getValue());
+ }
+ });
+ }
+ if (!clusterPreferences.isEmpty()) {
+ ew.put("preferences", (IteratorWriter) iw -> {
+ for (Preference p : clusterPreferences) iw.add(p);
+ });
+ }
+
+ }
+
+ /*This stores the logical state of the system, given a policy and
+ * a cluster state.
+ *
+ */
+ public class Session implements MapWriter {
+ final List<String> nodes;
+ final ClusterDataProvider dataProvider;
+ final List<Row> matrix;
+ Set<String> collections = new HashSet<>();
+ List<Clause> expandedClauses;
+ List<Violation> violations = new ArrayList<>();
+
+ private Session(List<String> nodes, ClusterDataProvider dataProvider,
+ List<Row> matrix, List<Clause> expandedClauses) {
+ this.nodes = nodes;
+ this.dataProvider = dataProvider;
+ this.matrix = matrix;
+ this.expandedClauses = expandedClauses;
+ }
+
+ Session(ClusterDataProvider dataProvider) {
+ this.nodes = new ArrayList<>(dataProvider.getNodes());
+ this.dataProvider = dataProvider;
+ for (String node : nodes) {
+ collections.addAll(dataProvider.getReplicaInfo(node, Collections.emptyList()).keySet());
+ }
+
+ expandedClauses = clusterPolicy.stream()
+ .filter(clause -> !clause.isPerCollectiontag())
+ .collect(Collectors.toList());
+
+ for (String c : collections) {
+ addClausesForCollection(dataProvider, c);
+ }
+
+ Collections.sort(expandedClauses);
+
+ matrix = new ArrayList<>(nodes.size());
+ for (String node : nodes) matrix.add(new Row(node, params, dataProvider));
+ applyRules();
+ }
+
+ private void addClausesForCollection(ClusterDataProvider dataProvider, String c) {
+ String p = dataProvider.getPolicyNameByCollection(c);
+ if (p != null) {
+ List<Clause> perCollPolicy = policies.get(p);
+ if (perCollPolicy == null)
+ throw new RuntimeException(StrUtils.formatString("Policy for collection {0} is {1} . It does not exist", c, p));
+ }
+ expandedClauses.addAll(mergePolicies(c, policies.getOrDefault(p, emptyList()), clusterPolicy));
+ }
+
+ Session copy() {
+ return new Session(nodes, dataProvider, getMatrixCopy(), expandedClauses);
+ }
+
+ List<Row> getMatrixCopy() {
+ return matrix.stream()
+ .map(Row::copy)
+ .collect(Collectors.toList());
+ }
+
+ Policy getPolicy() {
+ return Policy.this;
+
+ }
+
+ /**
+ * Apply the preferences and conditions
+ */
+ private void applyRules() {
+ if (!clusterPreferences.isEmpty()) {
+ //this is to set the approximate value according to the precision
+ ArrayList<Row> tmpMatrix = new ArrayList<>(matrix);
+ for (Preference p : clusterPreferences) {
+ Collections.sort(tmpMatrix, (r1, r2) -> p.compare(r1, r2, false));
+ p.setApproxVal(tmpMatrix);
+ }
+ //approximate values are set now. Let's do recursive sorting
+ Collections.sort(matrix, (Row r1, Row r2) -> {
+ int result = clusterPreferences.get(0).compare(r1, r2, true);
+ if (result == 0) result = clusterPreferences.get(0).compare(r1, r2, false);
+ return result;
+ });
+ }
+
+ for (Clause clause : expandedClauses) {
+ List<Violation> errs = clause.test(matrix);
+ violations.addAll(errs);
+ }
+ }
+
+ public List<Violation> getViolations() {
+ return violations;
+ }
+
+ public Suggester getSuggester(CollectionAction action) {
+ Suggester op = ops.get(action).get();
+ if (op == null) throw new UnsupportedOperationException(action.toString() + "is not supported");
+ op._init(this);
+ return op;
+ }
+
+ @Override
+ public void writeMap(EntryWriter ew) throws IOException {
+ for (int i = 0; i < matrix.size(); i++) {
+ Row row = matrix.get(i);
+ ew.put(row.node, row);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return Utils.toJSONString(toMap(new LinkedHashMap<>()));
+ }
+
+ public List<Row> getSorted() {
+ return Collections.unmodifiableList(matrix);
+ }
+ }
+
+
+ public Session createSession(ClusterDataProvider dataProvider) {
+ return new Session(dataProvider);
+ }
+
+ public enum SortParam {
+ freedisk(0, Integer.MAX_VALUE), cores(0, Integer.MAX_VALUE), heapUsage(0, Integer.MAX_VALUE), sysLoadAvg(0, 100);
+
+ public final int min,max;
+
+ SortParam(int min, int max) {
+ this.min = min;
+ this.max = max;
+ }
+
+ static SortParam get(String m) {
+ for (SortParam p : values()) if (p.name().equals(m)) return p;
+ throw new RuntimeException(StrUtils.formatString("Invalid sort {0} Sort must be on one of these {1}", m, Arrays.asList(values())));
+ }
+ }
+
+ enum Sort {
+ maximize(1), minimize(-1);
+ final int sortval;
+
+ Sort(int i) {
+ sortval = i;
+ }
+
+ static Sort get(Map<String, Object> m) {
+ if (m.containsKey(maximize.name()) && m.containsKey(minimize.name())) {
+ throw new RuntimeException("Cannot have both 'maximize' and 'minimize'");
+ }
+ if (m.containsKey(maximize.name())) return maximize;
+ if (m.containsKey(minimize.name())) return minimize;
+ throw new RuntimeException("must have either 'maximize' or 'minimize'");
+ }
+ }
+
+
+ /* A suggester is capable of suggesting a collection operation
+ * given a particular session. Before it suggests a new operation,
+ * it ensures that ,
+ * a) load is reduced on the most loaded node
+ * b) it causes no new violations
+ *
+ */
+ public static abstract class Suggester {
+ protected final EnumMap<Hint, Object> hints = new EnumMap<>(Hint.class);
+ Policy.Session session;
+ SolrRequest operation;
+ protected List<Violation> originalViolations = new ArrayList<>();
+ private boolean isInitialized = false;
+
+ private void _init(Session session) {
+ this.session = session.copy();
+ }
+
+ public Suggester hint(Hint hint, Object value) {
- if (hint == Hint.TARGET_NODE || hint == Hint.SRC_NODE) {
++ if (hint == Hint.TARGET_NODE || hint == Hint.SRC_NODE || hint == Hint.COLL) {
+ ((Set) hints.computeIfAbsent(hint, h -> new HashSet<>())).add(value);
+ } else {
+ hints.put(hint, value == null ? null : String.valueOf(value));
+ }
+ return this;
+ }
+
+ abstract SolrRequest init();
+
+
+ public SolrRequest getOperation() {
+ if (!isInitialized) {
- String coll = (String) hints.get(Hint.COLL);
++ Set<String> collections = new HashSet<>();
++ if (hints.containsKey(Hint.COLL)) {
++ collections = (Set<String>) hints.get(Hint.COLL);
++ }
+ String shard = (String) hints.get(Hint.SHARD);
+ // if this is not a known collection from the existing clusterstate,
+ // then add it
- if (session.matrix.stream().noneMatch(row -> row.collectionVsShardVsReplicas.containsKey(coll))) {
- session.addClausesForCollection(session.dataProvider, coll);
- Collections.sort(session.expandedClauses);
- }
- if (coll != null) {
- for (Row row : session.matrix) {
- if (!row.collectionVsShardVsReplicas.containsKey(coll)) row.collectionVsShardVsReplicas.put(coll, new HashMap<>());
- if (shard != null) {
- Map<String, List<ReplicaInfo>> shardInfo = row.collectionVsShardVsReplicas.get(coll);
- if (!shardInfo.containsKey(shard)) shardInfo.put(shard, new ArrayList<>());
++ for (String coll : collections) {
++ if (session.matrix.stream().noneMatch(row -> row.collectionVsShardVsReplicas.containsKey(coll))) {
++ session.addClausesForCollection(session.dataProvider, coll);
++ }
++ if (coll != null) {
++ for (Row row : session.matrix) {
++ if (!row.collectionVsShardVsReplicas.containsKey(coll)) row.collectionVsShardVsReplicas.put(coll, new HashMap<>());
++ if (shard != null) {
++ Map<String, List<ReplicaInfo>> shardInfo = row.collectionVsShardVsReplicas.get(coll);
++ if (!shardInfo.containsKey(shard)) shardInfo.put(shard, new ArrayList<>());
++ }
+ }
+ }
+ }
++ Collections.sort(session.expandedClauses);
+ Set<String> srcNodes = (Set<String>) hints.get(Hint.SRC_NODE);
+ if (srcNodes != null && !srcNodes.isEmpty()) {
+ // the source node is dead so live nodes may not have it
+ for (String srcNode : srcNodes) {
+ if(session.matrix.stream().noneMatch(row -> row.node.equals(srcNode)))
+ session.matrix.add(new Row(srcNode, session.getPolicy().params, session.dataProvider));
+ }
+ }
+ session.applyRules();
+ originalViolations.addAll(session.getViolations());
+ this.operation = init();
+ isInitialized = true;
+ }
+ return operation;
+ }
+
+ public Session getSession() {
+ return session;
+ }
+
+ List<Row> getMatrix() {
+ return session.matrix;
+
+ }
+
+ //check if the fresh set of violations is less serious than the last set of violations
+ boolean isLessSerious(List<Violation> fresh, List<Violation> old) {
+ if (old == null || fresh.size() < old.size()) return true;
+ if (fresh.size() == old.size()) {
+ for (int i = 0; i < fresh.size(); i++) {
+ Violation freshViolation = fresh.get(i);
+ Violation oldViolation = null;
+ for (Violation v : old) {
+ if (v.equals(freshViolation)) oldViolation = v;
+ }
+ if (oldViolation != null && freshViolation.isLessSerious(oldViolation)) return true;
+ }
+ }
+ return false;
+ }
+
+ boolean containsNewErrors(List<Violation> violations) {
+ for (Violation v : violations) {
+ int idx = originalViolations.indexOf(v);
+ if (idx < 0 || originalViolations.get(idx).isLessSerious(v)) return true;
+ }
+ return false;
+ }
+
+ List<Pair<ReplicaInfo, Row>> getValidReplicas(boolean sortDesc, boolean isSource, int until) {
+ List<Pair<ReplicaInfo, Row>> allPossibleReplicas = new ArrayList<>();
+
+ if (sortDesc) {
+ if (until == -1) until = getMatrix().size();
+ for (int i = 0; i < until; i++) addReplicaToList(getMatrix().get(i), isSource, allPossibleReplicas);
+ } else {
+ if (until == -1) until = 0;
+ for (int i = getMatrix().size() - 1; i >= until; i--)
+ addReplicaToList(getMatrix().get(i), isSource, allPossibleReplicas);
+ }
+ return allPossibleReplicas;
+ }
+
+ void addReplicaToList(Row r, boolean isSource, List<Pair<ReplicaInfo, Row>> replicaList) {
+ if (!isAllowed(r.node, isSource ? Hint.SRC_NODE : Hint.TARGET_NODE)) return;
+ for (Map.Entry<String, Map<String, List<ReplicaInfo>>> e : r.collectionVsShardVsReplicas.entrySet()) {
+ if (!isAllowed(e.getKey(), Hint.COLL)) continue;
+ for (Map.Entry<String, List<ReplicaInfo>> shard : e.getValue().entrySet()) {
+ if (!isAllowed(e.getKey(), Hint.SHARD)) continue;//todo fix
+ if(shard.getValue() == null || shard.getValue().isEmpty()) continue;
+ replicaList.add(new Pair<>(shard.getValue().get(0), r));
+ }
+ }
+ }
+
+ protected List<Violation> testChangedMatrix(boolean strict, List<Row> rows) {
+ List<Violation> errors = new ArrayList<>();
+ for (Clause clause : session.expandedClauses) {
+ if (strict || clause.strict) {
+ List<Violation> errs = clause.test(rows);
+ if (!errs.isEmpty()) {
+ errors.addAll(errs);
+ }
+ }
+ }
+ return errors;
+ }
+
+ ArrayList<Row> getModifiedMatrix(List<Row> matrix, Row tmpRow, int i) {
+ ArrayList<Row> copy = new ArrayList<>(matrix);
+ copy.set(i, tmpRow);
+ return copy;
+ }
+
+ protected boolean isAllowed(Object v, Hint hint) {
+ Object hintVal = hints.get(hint);
- if (hint == Hint.TARGET_NODE || hint == Hint.SRC_NODE) {
++ if (hint == Hint.TARGET_NODE || hint == Hint.SRC_NODE || hint == Hint.COLL) {
+ Set set = (Set) hintVal;
+ return set == null || set.contains(v);
+ } else {
+ return hintVal == null || Objects.equals(v, hintVal);
+ }
+ }
+
+ public enum Hint {
+ COLL, SHARD, SRC_NODE, TARGET_NODE, REPLICATYPE
+ }
+
+
+ }
+
+ public static List<Clause> mergePolicies(String coll,
+ List<Clause> collPolicy,
+ List<Clause> globalPolicy) {
+
+ List<Clause> merged = insertColl(coll, collPolicy);
+ List<Clause> global = insertColl(coll, globalPolicy);
+ merged.addAll(global.stream()
+ .filter(clusterPolicyClause -> merged.stream().noneMatch(perCollPolicy -> perCollPolicy.doesOverride(clusterPolicyClause)))
+ .collect(Collectors.toList()));
+ return merged;
+ }
+
+ /**
+ * Insert the collection name into the clauses where collection is not specified
+ */
+ static List<Clause> insertColl(String coll, Collection<Clause> conditions) {
+ return conditions.stream()
+ .filter(Clause::isPerCollectiontag)
+ .map(clause -> {
+ Map<String, Object> copy = new LinkedHashMap<>(clause.original);
+ if (!copy.containsKey("collection")) copy.put("collection", coll);
+ return new Clause(copy);
+ })
+ .filter(it -> (it.collection.isPass(coll)))
+ .collect(Collectors.toList());
+
+ }
+
+ private static final Map<CollectionAction, Supplier<Suggester>> ops = new HashMap<>();
+
+ static {
+ ops.put(CollectionAction.ADDREPLICA, () -> new AddReplicaSuggester());
+ ops.put(CollectionAction.MOVEREPLICA, () -> new MoveReplicaSuggester());
+ }
+
+ public Map<String, List<Clause>> getPolicies() {
+ return policies;
+ }
+ }