You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2020/02/26 15:59:43 UTC

[lucene-solr] 01/02: SOLR-14275: More optimizations. Some TestPolicy* tests are failing now.

This is an automated email from the ASF dual-hosted git repository.

ab pushed a commit to branch jira/solr-14275
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit 219b33b0371eccb03d0ebc2e1d9b129cb353542b
Author: Andrzej Bialecki <ab...@apache.org>
AuthorDate: Wed Feb 26 15:01:37 2020 +0100

    SOLR-14275: More optimizations. Some TestPolicy* tests are failing now.
---
 .../client/solrj/cloud/autoscaling/Clause.java     | 30 +++++++---
 .../solrj/cloud/autoscaling/FreeDiskVariable.java  | 29 ++++++---
 .../client/solrj/cloud/autoscaling/Policy.java     | 68 ++++++++++++++++++++++
 .../solrj/cloud/autoscaling/ReplicaVariable.java   | 28 ++++++---
 .../client/solrj/cloud/autoscaling/Suggester.java  |  4 +-
 5 files changed, 130 insertions(+), 29 deletions(-)

diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Clause.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Clause.java
index 5bdb46f..474d67b 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Clause.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Clause.java
@@ -461,6 +461,10 @@ public class Clause implements MapWriter, Comparable<Clause> {
   }
 
   List<Violation> testGroupNodes(Policy.Session session, double[] deviations) {
+    return testGroupNodes(session, null, deviations);
+  }
+
+  List<Violation> testGroupNodes(Policy.Session session, Row changedRow, double[] deviations) {
     //e.g:  {replica:'#EQUAL', shard:'#EACH',  sysprop.zone:'#EACH'}
     ComputedValueEvaluator eval = new ComputedValueEvaluator(session);
     eval.collName = (String) collection.getValue();
@@ -469,7 +473,7 @@ public class Clause implements MapWriter, Comparable<Clause> {
     Set tags = getUniqueTags(session, eval);
     if (tags.isEmpty()) return Collections.emptyList();
 
-    Set<String> shards = getShardNames(session, eval);
+    Set<String> shards = getShardNames(session, changedRow, eval);
 
     for (String s : shards) {
       final ReplicaCount replicaCount = new ReplicaCount();
@@ -477,7 +481,8 @@ public class Clause implements MapWriter, Comparable<Clause> {
 
       for (Object tag : tags) {
         replicaCount.reset();
-        for (Row row : session.matrix) {
+        List<Row> rows = changedRow != null ? Collections.singletonList(changedRow) : session.matrix;
+        for (Row row : rows) {
          if(!isRowPass(eval, tag, row)) continue;
           addReplicaCountsForNode(eval, replicaCount, row);
         }
@@ -598,14 +603,19 @@ public class Clause implements MapWriter, Comparable<Clause> {
   }
 
   List<Violation> testPerNode(Policy.Session session, double[] deviations) {
+    return testPerNode(session, null, deviations);
+  }
+
+  List<Violation> testPerNode(Policy.Session session, Row changedRow, double[] deviations) {
     ComputedValueEvaluator eval = new ComputedValueEvaluator(session);
     eval.collName = (String) collection.getValue();
     Violation.Ctx ctx = new Violation.Ctx(this, session.matrix, eval);
-    Set<String> shards = getShardNames(session, eval);
+    Set<String> shards = getShardNames(session, changedRow, eval);
     for (String s : shards) {
       final ReplicaCount replicaCount = new ReplicaCount();
       eval.shardName = s;
-      for (Row row : session.matrix) {
+      List<Row> rows = changedRow != null ? Collections.singletonList(changedRow) : session.matrix;
+      for (Row row : rows) {
         replicaCount.reset();
         eval.node = row.node;
         Condition tag = this.tag;
@@ -639,12 +649,14 @@ public class Clause implements MapWriter, Comparable<Clause> {
   }
 
   private Set<String> getShardNames(Policy.Session session,
+                                    Row changedRow,
                                     ComputedValueEvaluator eval) {
     Set<String> shards = new HashSet<>();
     if (isShardAbsent()) {
       shards.add(Policy.ANY); //consider the entire collection is a single shard
     } else {
-      for (Row row : session.matrix) {
+      List<Row> rows = changedRow != null ? Collections.singletonList(changedRow) : session.matrix;
+      for (Row row : rows) {
         row.forEachShard(eval.collName, (shard, r) -> {
           if (this.shard.isPass(shard)) shards.add(shard); // add relevant shards
         });
@@ -665,16 +677,16 @@ public class Clause implements MapWriter, Comparable<Clause> {
     if (isPerCollectiontag()) {
       if(nodeSetPresent) {
         if(put == Put.ON_EACH){
-          return testPerNode(session, deviations) ;
+          return testPerNode(session, changedRow, deviations) ;
         } else {
-          return testGroupNodes(session, deviations);
+          return testGroupNodes(session, changedRow, deviations);
         }
       }
 
       return tag.varType == Type.NODE ||
           (tag.varType.meta.isNodeSpecificVal() && replica.computedType == null) ?
-          testPerNode(session, deviations) :
-          testGroupNodes(session, deviations);
+          testPerNode(session, changedRow, deviations) :
+          testGroupNodes(session, changedRow, deviations);
     } else {
       ComputedValueEvaluator computedValueEvaluator = new ComputedValueEvaluator(session);
       Violation.Ctx ctx = new Violation.Ctx(this, session.matrix, computedValueEvaluator);
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/FreeDiskVariable.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/FreeDiskVariable.java
index 2193ea3..a3a14dd 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/FreeDiskVariable.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/FreeDiskVariable.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
@@ -144,20 +145,30 @@ public class FreeDiskVariable extends VariableBase {
 
   }
 
-  //When a replica is added, freedisk should be incremented
+  //When a replica is added, freedisk should be decremented
   @Override
   public void projectAddReplica(Cell cell, ReplicaInfo ri, Consumer<Row.OperationInfo> ops, boolean strictMode) {
     //go through other replicas of this shard and copy the index size value into this
+    AtomicBoolean indexSizeSet = new AtomicBoolean();
     for (Row row : cell.getRow().session.matrix) {
-      row.forEachReplica(replicaInfo -> {
-        if (ri != replicaInfo &&
-            ri.getCollection().equals(replicaInfo.getCollection()) &&
-            ri.getShard().equals(replicaInfo.getShard()) &&
-            ri.getVariable(CORE_IDX.tagName) == null &&
-            replicaInfo.getVariable(CORE_IDX.tagName) != null) {
-          ri.getVariables().put(CORE_IDX.tagName, validate(CORE_IDX.tagName, replicaInfo.getVariable(CORE_IDX.tagName), false));
-        }
+      Object indexSize = row.computeCacheIfAbsent(ri.getCollection(), ri.getShard(), "freedisk", CORE_IDX.tagName, o -> {
+        Object[] result = new Object[1];
+        row.forEachShard(ri.getCollection(), (shard, replicas) -> {
+          if (ri.getShard().equals(shard)) {
+            for (ReplicaInfo replicaInfo : replicas) {
+              if (replicaInfo.getVariable(CORE_IDX.tagName) != null) {
+                result[0] = replicaInfo.getVariable(CORE_IDX.tagName);
+                return;
+              }
+            }
+          }
+        });
+        return result[0];
       });
+      if (indexSize != null) {
+        ri.getVariables().put(CORE_IDX.tagName, validate(CORE_IDX.tagName, indexSize, false));
+        break;
+      }
     }
     Double idxSize = (Double) validate(CORE_IDX.tagName, ri.getVariable(CORE_IDX.tagName), false);
     if (idxSize == null) return;
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java
index afe04b5..0cb3cb3 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java
@@ -335,6 +335,74 @@ public class Policy implements MapWriter {
     }
   }
 
+  static void setChangedRowOrder(List<Preference> clusterPreferences, Row changedRow, List<Row> matrix) {
+    List<Row> matrixCopy = new ArrayList<>(matrix);
+    List<Row> deadNodes = null;
+    Iterator<Row> it =matrix.iterator();
+    Row rowToInsert = null;
+    while (it.hasNext()){
+      Row row = it.next();
+      if(!row.isLive){
+        if(deadNodes == null) deadNodes = new ArrayList<>();
+        deadNodes.add(row);
+        it.remove();
+      } else {
+        // remove the changed row from the matrix
+        if (row == changedRow) {
+          rowToInsert = row;
+          it.remove();
+        }
+      }
+    }
+    if (rowToInsert == null) {
+      throw new RuntimeException("the changed row is missing from the matrix! " + changedRow);
+    }
+
+    if (!clusterPreferences.isEmpty()) {
+      //this is to set the approximate value according to the precision
+      ArrayList<Row> tmpMatrix = new ArrayList<>(matrix);
+      Row[] lastComparison = new Row[2];
+      for (Preference p : clusterPreferences) {
+        try {
+          tmpMatrix.sort((r1, r2) -> {
+            lastComparison[0] = r1;
+            lastComparison[1] = r2;
+            return p.compare(r1, r2, false);
+          });
+        } catch (Exception e) {
+          try {
+            Map m = Collections.singletonMap("diagnostics", (MapWriter) ew -> {
+              PolicyHelper.writeNodes(ew, matrixCopy);
+              ew.put("config", matrix.get(0).session.getPolicy());
+            });
+            log.error("Exception! prefs = {}, recent r1 = {}, r2 = {}, matrix = {}",
+                clusterPreferences,
+                lastComparison[0].node,
+                lastComparison[1].node,
+                Utils.writeJson(m, new StringWriter(), true).toString());
+          } catch (IOException e1) {
+            //
+          }
+          throw new RuntimeException(e.getMessage());
+        }
+        p.setApproxVal(tmpMatrix);
+      }
+      // the tmpMatrix was needed only to set the approximate values, now we sort the real matrix
+      // recursing through each preference
+      matrix.sort((Row r1, Row r2) -> {
+        int result = clusterPreferences.get(0).compare(r1, r2, true);
+        if (result == 0) result = clusterPreferences.get(0).compare(r1, r2, false);
+        return result;
+      });
+
+      if(deadNodes != null){
+        for (Row deadNode : deadNodes) {
+          matrix.add(0, deadNode);
+        }
+      }
+    }
+  }
+
   /**
    * Insert the collection name into the clauses where collection is not specified
    */
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/ReplicaVariable.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/ReplicaVariable.java
index 675382a..d31bd2d 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/ReplicaVariable.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/ReplicaVariable.java
@@ -33,20 +33,32 @@ class ReplicaVariable extends VariableBase {
   public static final String REPLICASCOUNT = "relevantReplicas";
 
 
+  private static final class ReplicaCounter {
+    final String collection, shard;
+    final Clause clause;
+
+    ReplicaCounter(String collection, String shard, Clause clause) {
+      this.collection = collection;
+      this.shard = shard;
+      this.clause = clause;
+    }
 
+    int calculate(Row row) {
+      int[] result = new int[1];
+      row.forEachReplica(collection, replicaInfo -> {
+        if (clause.isMatch(replicaInfo, collection, shard))
+          result[0]++;
+      });
+      return result[0];
+    }
+  }
 
   static int getRelevantReplicasCount(Policy.Session session, Condition cv, String collection, String shard) {
     int totalReplicasOfInterest = 0;
     Clause clause = cv.getClause();
+    ReplicaCounter counter = new ReplicaCounter(collection, shard, clause);
     for (Row row : session.matrix) {
-      Integer perShardCount = row.computeCacheIfAbsent(collection, shard, REPLICASCOUNT, cv.clause, o -> {
-        int[] result = new int[1];
-        row.forEachReplica(collection, replicaInfo -> {
-          if (clause.isMatch(replicaInfo, collection, shard))
-            result[0]++;
-        });
-        return result[0];
-      });
+      Integer perShardCount = row.computeCacheIfAbsent(collection, shard, REPLICASCOUNT, cv.clause, o -> counter.calculate(row));
       if (perShardCount != null)
         totalReplicasOfInterest += perShardCount;
     }
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggester.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggester.java
index bbaabf2..9dc7fec 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggester.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Suggester.java
@@ -329,9 +329,7 @@ public abstract class Suggester implements MapWriter {
   List<Violation> testChangedMatrix(boolean executeInStrictMode, Row changedRow, Policy.Session session) {
     if (this.deviations != null) this.lastBestDeviation = this.deviations;
     this.deviations = null;
-    if (changedRow != null) {
-      Policy.setApproxValuesAndSortNodes(session.getPolicy().clusterPreferences, session.matrix);
-    }
+    Policy.setApproxValuesAndSortNodes(session.getPolicy().clusterPreferences, session.matrix);
     List<Violation> errors = new ArrayList<>();
     for (Clause clause : session.expandedClauses) {
       Clause originalClause = clause.derivedFrom == null ? clause : clause.derivedFrom;