You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2020/02/26 15:59:43 UTC
[lucene-solr] 01/02: SOLR-14275: More optimizations. Some
TestPolicy* tests are failing now.
This is an automated email from the ASF dual-hosted git repository.
ab pushed a commit to branch jira/solr-14275
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
commit 219b33b0371eccb03d0ebc2e1d9b129cb353542b
Author: Andrzej Bialecki <ab...@apache.org>
AuthorDate: Wed Feb 26 15:01:37 2020 +0100
SOLR-14275: More optimizations. Some TestPolicy* tests are failing now.
---
.../client/solrj/cloud/autoscaling/Clause.java | 30 +++++++---
.../solrj/cloud/autoscaling/FreeDiskVariable.java | 29 ++++++---
.../client/solrj/cloud/autoscaling/Policy.java | 68 ++++++++++++++++++++++
.../solrj/cloud/autoscaling/ReplicaVariable.java | 28 ++++++---
.../client/solrj/cloud/autoscaling/Suggester.java | 4 +-
5 files changed, 130 insertions(+), 29 deletions(-)
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Clause.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Clause.java
index 5bdb46f..474d67b 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Clause.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Clause.java
@@ -461,6 +461,10 @@ public class Clause implements MapWriter, Comparable<Clause> {
}
List<Violation> testGroupNodes(Policy.Session session, double[] deviations) {
+ return testGroupNodes(session, null, deviations);
+ }
+
+ List<Violation> testGroupNodes(Policy.Session session, Row changedRow, double[] deviations) {
//e.g: {replica:'#EQUAL', shard:'#EACH', sysprop.zone:'#EACH'}
ComputedValueEvaluator eval = new ComputedValueEvaluator(session);
eval.collName = (String) collection.getValue();
@@ -469,7 +473,7 @@ public class Clause implements MapWriter, Comparable<Clause> {
Set tags = getUniqueTags(session, eval);
if (tags.isEmpty()) return Collections.emptyList();
- Set<String> shards = getShardNames(session, eval);
+ Set<String> shards = getShardNames(session, changedRow, eval);
for (String s : shards) {
final ReplicaCount replicaCount = new ReplicaCount();
@@ -477,7 +481,8 @@ public class Clause implements MapWriter, Comparable<Clause> {
for (Object tag : tags) {
replicaCount.reset();
- for (Row row : session.matrix) {
+ List<Row> rows = changedRow != null ? Collections.singletonList(changedRow) : session.matrix;
+ for (Row row : rows) {
if(!isRowPass(eval, tag, row)) continue;
addReplicaCountsForNode(eval, replicaCount, row);
}
@@ -598,14 +603,19 @@ public class Clause implements MapWriter, Comparable<Clause> {
}
List<Violation> testPerNode(Policy.Session session, double[] deviations) {
+ return testPerNode(session, null, deviations);
+ }
+
+ List<Violation> testPerNode(Policy.Session session, Row changedRow, double[] deviations) {
ComputedValueEvaluator eval = new ComputedValueEvaluator(session);
eval.collName = (String) collection.getValue();
Violation.Ctx ctx = new Violation.Ctx(this, session.matrix, eval);
- Set<String> shards = getShardNames(session, eval);
+ Set<String> shards = getShardNames(session, changedRow, eval);
for (String s : shards) {
final ReplicaCount replicaCount = new ReplicaCount();
eval.shardName = s;
- for (Row row : session.matrix) {
+ List<Row> rows = changedRow != null ? Collections.singletonList(changedRow) : session.matrix;
+ for (Row row : rows) {
replicaCount.reset();
eval.node = row.node;
Condition tag = this.tag;
@@ -639,12 +649,14 @@ public class Clause implements MapWriter, Comparable<Clause> {
}
private Set<String> getShardNames(Policy.Session session,
+ Row changedRow,
ComputedValueEvaluator eval) {
Set<String> shards = new HashSet<>();
if (isShardAbsent()) {
shards.add(Policy.ANY); //consider the entire collection is a single shard
} else {
- for (Row row : session.matrix) {
+ List<Row> rows = changedRow != null ? Collections.singletonList(changedRow) : session.matrix;
+ for (Row row : rows) {
row.forEachShard(eval.collName, (shard, r) -> {
if (this.shard.isPass(shard)) shards.add(shard); // add relevant shards
});
@@ -665,16 +677,16 @@ public class Clause implements MapWriter, Comparable<Clause> {
if (isPerCollectiontag()) {
if(nodeSetPresent) {
if(put == Put.ON_EACH){
- return testPerNode(session, deviations) ;
+ return testPerNode(session, changedRow, deviations) ;
} else {
- return testGroupNodes(session, deviations);
+ return testGroupNodes(session, changedRow, deviations);
}
}
return tag.varType == Type.NODE ||
(tag.varType.meta.isNodeSpecificVal() && replica.computedType == null) ?
- testPerNode(session, deviations) :
- testGroupNodes(session, deviations);
+ testPerNode(session, changedRow, deviations) :
+ testGroupNodes(session, changedRow, deviations);
} else {
ComputedValueEvaluator computedValueEvaluator = new ComputedValueEvaluator(session);
Violation.Ctx ctx = new Violation.Ctx(this, session.matrix, computedValueEvaluator);
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/FreeDiskVariable.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/FreeDiskVariable.java
index 2193ea3..a3a14dd 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/FreeDiskVariable.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/FreeDiskVariable.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;
@@ -144,20 +145,30 @@ public class FreeDiskVariable extends VariableBase {
}
- //When a replica is added, freedisk should be incremented
+ //When a replica is added, freedisk should be decremented
@Override
public void projectAddReplica(Cell cell, ReplicaInfo ri, Consumer<Row.OperationInfo> ops, boolean strictMode) {
//go through other replicas of this shard and copy the index size value into this
+ AtomicBoolean indexSizeSet = new AtomicBoolean();
for (Row row : cell.getRow().session.matrix) {
- row.forEachReplica(replicaInfo -> {
- if (ri != replicaInfo &&
- ri.getCollection().equals(replicaInfo.getCollection()) &&
- ri.getShard().equals(replicaInfo.getShard()) &&
- ri.getVariable(CORE_IDX.tagName) == null &&
- replicaInfo.getVariable(CORE_IDX.tagName) != null) {
- ri.getVariables().put(CORE_IDX.tagName, validate(CORE_IDX.tagName, replicaInfo.getVariable(CORE_IDX.tagName), false));
- }
+ Object indexSize = row.computeCacheIfAbsent(ri.getCollection(), ri.getShard(), "freedisk", CORE_IDX.tagName, o -> {
+ Object[] result = new Object[1];
+ row.forEachShard(ri.getCollection(), (shard, replicas) -> {
+ if (ri.getShard().equals(shard)) {
+ for (ReplicaInfo replicaInfo : replicas) {
+ if (replicaInfo.getVariable(CORE_IDX.tagName) != null) {
+ result[0] = replicaInfo.getVariable(CORE_IDX.tagName);
+ return;
+ }
+ }
+ }
+ });
+ return result[0];
});
+ if (indexSize != null) {
+ ri.getVariables().put(CORE_IDX.tagName, validate(CORE_IDX.tagName, indexSize, false));
+ break;
+ }
}
Double idxSize = (Double) validate(CORE_IDX.tagName, ri.getVariable(CORE_IDX.tagName), false);
if (idxSize == null) return;
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java
index afe04b5..0cb3cb3 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java
@@ -335,6 +335,74 @@ public class Policy implements MapWriter {
}
}
+ static void setChangedRowOrder(List<Preference> clusterPreferences, Row changedRow, List<Row> matrix) {
+ List<Row> matrixCopy = new ArrayList<>(matrix);
+ List<Row> deadNodes = null;
+ Iterator<Row> it =matrix.iterator();
+ Row rowToInsert = null;
+ while (it.hasNext()){
+ Row row = it.next();
+ if(!row.isLive){
+ if(deadNodes == null) deadNodes = new ArrayList<>();
+ deadNodes.add(row);
+ it.remove();
+ } else {
+ // remove the changed row from the matrix
+ if (row == changedRow) {
+ rowToInsert = row;
+ it.remove();
+ }
+ }
+ }
+ if (rowToInsert == null) {
+ throw new RuntimeException("the changed row is missing from the matrix! " + changedRow);
+ }
+
+ if (!clusterPreferences.isEmpty()) {
+ //this is to set the approximate value according to the precision
+ ArrayList<Row> tmpMatrix = new ArrayList<>(matrix);
+ Row[] lastComparison = new Row[2];
+ for (Preference p : clusterPreferences) {
+ try {
+ tmpMatrix.sort((r1, r2) -> {
+ lastComparison[0] = r1;
+ lastComparison[1] = r2;
+ return p.compare(r1, r2, false);
+ });
+ } catch (Exception e) {
+ try {
+ Map m = Collections.singletonMap("diagnostics", (MapWriter) ew -> {
+ PolicyHelper.writeNodes(ew, matrixCopy);
+ ew.put("config", matrix.get(0).session.getPolicy());
+ });
+ log.error("Exception! prefs = {}, recent r1 = {}, r2 = {}, matrix = {}",
+ clusterPreferences,
+ lastComparison[0].node,
+ lastComparison[1].node,
+ Utils.writeJson(m, new StringWriter(), true).toString());
+ } catch (IOException e1) {
+ //
+ }
+ throw new RuntimeException(e.getMessage());
+ }
+ p.setApproxVal(tmpMatrix);
+ }
+ // the tmpMatrix was needed only to set the approximate values, now we sort the real matrix
+ // recursing through each preference
+ matrix.sort((Row r1, Row r2) -> {
+ int result = clusterPreferences.get(0).compare(r1, r2, true);
+ if (result == 0) result = clusterPreferences.get(0).compare(r1, r2, false);
+ return result;
+ });
+
+ if(deadNodes != null){
+ for (Row deadNode : deadNodes) {
+ matrix.add(0, deadNode);
+ }
+ }
+ }
+ }
+
/**
* Insert the collection name into the clauses where collection is not specified
*/
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/ReplicaVariable.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/ReplicaVariable.java
index 675382a..d31bd2d 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/ReplicaVariable.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/ReplicaVariable.java
@@ -33,20 +33,32 @@ class ReplicaVariable extends VariableBase {
public static final String REPLICASCOUNT = "relevantReplicas";
+ private static final class ReplicaCounter {
+ final String collection, shard;
+ final Clause clause;
+
+ ReplicaCounter(String collection, String shard, Clause clause) {
+ this.collection = collection;
+ this.shard = shard;
+ this.clause = clause;
+ }
+ int calculate(Row row) {
+ int[] result = new int[1];
+ row.forEachReplica(collection, replicaInfo -> {
+ if (clause.isMatch(replicaInfo, collection, shard))
+ result[0]++;
+ });
+ return result[0];
+ }
+ }
static int getRelevantReplicasCount(Policy.Session session, Condition cv, String collection, String shard) {
int totalReplicasOfInterest = 0;
Clause clause = cv.getClause();
+ ReplicaCounter counter = new ReplicaCounter(collection, shard, clause);
for (Row row : session.matrix) {
- Integer perShardCount = row.computeCacheIfAbsent(collection, shard, REPLICASCOUNT, cv.clause, o -> {
- int[] result = new int[1];
- row.forEachReplica(collection, replicaInfo -> {
- if (clause.isMatch(replicaInfo, collection, shard))
- result[0]++;
- });
- return result[0];
- });
+ Integer perShardCount = row.computeCacheIfAbsent(collection, shard, REPLICASCOUNT, cv.clause, o -> counter.calculate(row));
if (perShardCount != null)
totalReplicasOfInterest += perShardCount;
}
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggester.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggester.java
index bbaabf2..9dc7fec 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggester.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggester.java
@@ -329,9 +329,7 @@ public abstract class Suggester implements MapWriter {
List<Violation> testChangedMatrix(boolean executeInStrictMode, Row changedRow, Policy.Session session) {
if (this.deviations != null) this.lastBestDeviation = this.deviations;
this.deviations = null;
- if (changedRow != null) {
- Policy.setApproxValuesAndSortNodes(session.getPolicy().clusterPreferences, session.matrix);
- }
+ Policy.setApproxValuesAndSortNodes(session.getPolicy().clusterPreferences, session.matrix);
List<Violation> errors = new ArrayList<>();
for (Clause clause : session.expandedClauses) {
Clause originalClause = clause.derivedFrom == null ? clause : clause.derivedFrom;