You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by no...@apache.org on 2020/06/10 11:14:21 UTC

[lucene-solr] branch master updated: SOLR-14347: fix cached session update to not depend on Zookeeper state (#1542)

This is an automated email from the ASF dual-hosted git repository.

noble pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/master by this push:
     new d4f7c90  SOLR-14347: fix cached session update to not depend on Zookeeper state (#1542)
d4f7c90 is described below

commit d4f7c90bab3d3dd76c066aeb598544cf8250bac8
Author: murblanc <43...@users.noreply.github.com>
AuthorDate: Wed Jun 10 13:13:55 2020 +0200

    SOLR-14347: fix cached session update to not depend on Zookeeper state (#1542)
    
    SOLR-14347: fix cached session update to not depend on Zookeeper state
---
 .../client/solrj/cloud/autoscaling/Policy.java     | 89 ++++++++++++++++++++--
 .../solrj/cloud/autoscaling/PolicyHelper.java      | 34 ++++++---
 .../solr/client/solrj/cloud/autoscaling/Row.java   | 25 ++++--
 3 files changed, 125 insertions(+), 23 deletions(-)

diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java
index 0f17306..164f827 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java
@@ -382,7 +382,7 @@ public class Policy implements MapWriter {
   }
 
   public Session createSession(SolrCloudManager cloudManager) {
-    return new Session(cloudManager, this, null);
+    return createSession(cloudManager, null);
   }
 
   public Session createSession(SolrCloudManager cloudManager, Transaction tx) {
@@ -550,14 +550,18 @@ public class Policy implements MapWriter {
     final SolrCloudManager cloudManager;
     final List<Row> matrix;
     final NodeStateProvider nodeStateProvider;
-    Set<String> collections = new HashSet<>();
+    final Set<String> collections;
     final Policy policy;
     List<Clause> expandedClauses;
     List<Violation> violations = new ArrayList<>();
     Transaction transaction;
 
 
+    /**
+     * This constructor creates a Session from the current Zookeeper collection, replica and node states.
+     */
     Session(SolrCloudManager cloudManager, Policy policy, Transaction transaction) {
+      collections = new HashSet<>();
       this.transaction = transaction;
       this.policy = policy;
       ClusterState state = null;
@@ -605,37 +609,106 @@ public class Policy implements MapWriter {
       applyRules();
     }
 
+    /**
+     * Creates a new Session and updates the Rows in the internal matrix to reference this session.
+     */
     private Session(List<String> nodes, SolrCloudManager cloudManager,
-                   List<Row> matrix, List<Clause> expandedClauses,
+                   List<Row> matrix, Set<String> collections, List<Clause> expandedClauses,
                    NodeStateProvider nodeStateProvider, Policy policy, Transaction transaction) {
       this.transaction = transaction;
       this.policy = policy;
       this.nodes = nodes;
       this.cloudManager = cloudManager;
+      this.collections = collections;
       this.matrix = matrix;
       this.expandedClauses = expandedClauses;
       this.nodeStateProvider = nodeStateProvider;
       for (Row row : matrix) row.session = this;
     }
 
+    /**
+     * Given a session (this one), creates a new one for placement simulations that retains all the relevant information,
+     * whether or not that info already made it to Zookeeper.
+     */
+    public Session cloneToNewSession(SolrCloudManager cloudManager) {
+      NodeStateProvider nodeStateProvider = cloudManager.getNodeStateProvider();
+      ClusterStateProvider clusterStateProvider = cloudManager.getClusterStateProvider();
+
+      List<String> nodes = new ArrayList<>(clusterStateProvider.getLiveNodes());
+
+      // Copy all collections from old session, even those not yet in ZK state
+      Set<String> collections = new HashSet<>(this.collections);
+
+      // (shallow) copy the expanded clauses
+      List<Clause> expandedClauses = new ArrayList<>(this.expandedClauses);
+
+      List<Row> matrix = new ArrayList<>(nodes.size());
+      Map<String, Row> copyNodes = new HashMap<>();
+      for (Row oldRow: this.matrix) {
+        copyNodes.put(oldRow.node, oldRow.copy());
+      }
+      for (String node : nodes) {
+        // Do we have a row for that node in this session? If yes, reuse without trying to fetch from cluster state (latest changes might not be there)
+        Row newRow = copyNodes.get(node);
+        if (newRow == null) {
+          // Dealing with a node that doesn't exist in this Session. Need to create related data from scratch.
+          // We pass null for the Session in purpose. The current (this) session in not the correct one for this Row.
+          // The correct session will be set when we build the new Session instance at the end of this method.
+          newRow = new Row(node, this.policy.getParams(), this.policy.getPerReplicaAttributes(), null, nodeStateProvider, cloudManager);
+          // Get info for collections on that node
+          Set<String> collectionsOnNewNode = nodeStateProvider.getReplicaInfo(node, Collections.emptyList()).keySet();
+          collections.addAll(collectionsOnNewNode);
+
+          // Adjust policies to take into account new collections
+          for (String collection : collectionsOnNewNode) {
+            // We pass this.policy but it is not modified so will not impact this session being cloned
+            addClausesForCollection(this.policy, expandedClauses, clusterStateProvider, collection);
+          }
+        }
+        matrix.add(newRow);
+      }
+
+      if (nodes.size() > 0) {
+        //if any collection has 'withCollection' irrespective of the node, the NodeStateProvider returns a map value
+        Map<String, Object> vals = nodeStateProvider.getNodeValues(nodes.get(0), Collections.singleton("withCollection"));
+        if (!vals.isEmpty() && vals.get("withCollection") != null) {
+          Map<String, String> withCollMap = (Map<String, String>) vals.get("withCollection");
+          if (!withCollMap.isEmpty()) {
+            Clause withCollClause = new Clause((Map<String,Object>)Utils.fromJSONString("{withCollection:'*' , node: '#ANY'}") ,
+                new Condition(NODE.tagName, "#ANY", Operand.EQUAL, null, null),
+                new Condition(WITH_COLLECTION.tagName,"*" , Operand.EQUAL, null, null), true, null, false
+            );
+            expandedClauses.add(withCollClause);
+          }
+        }
+      }
+
+      Collections.sort(expandedClauses);
+
+      Session newSession = new Session(nodes, cloudManager, matrix, collections, expandedClauses,
+          nodeStateProvider, this.policy, this.transaction);
+      newSession.applyRules();
+
+      return newSession;
+    }
 
     void addClausesForCollection(ClusterStateProvider stateProvider, String collection) {
       addClausesForCollection(policy, expandedClauses, stateProvider, collection);
     }
 
-    public static void addClausesForCollection(Policy policy, List<Clause> clauses, ClusterStateProvider stateProvider, String c) {
-      String p = stateProvider.getPolicyNameByCollection(c);
+    public static void addClausesForCollection(Policy policy, List<Clause> clauses, ClusterStateProvider stateProvider, String collectionName) {
+      String p = stateProvider.getPolicyNameByCollection(collectionName);
       if (p != null) {
         List<Clause> perCollPolicy = policy.getPolicies().get(p);
         if (perCollPolicy == null) {
           return;
         }
       }
-      clauses.addAll(mergePolicies(c, policy.getPolicies().getOrDefault(p, emptyList()), policy.getClusterPolicy()));
+      clauses.addAll(mergePolicies(collectionName, policy.getPolicies().getOrDefault(p, emptyList()), policy.getClusterPolicy()));
     }
 
     Session copy() {
-      return new Session(nodes, cloudManager, getMatrixCopy(), expandedClauses, nodeStateProvider, policy, transaction);
+      return new Session(nodes, cloudManager, getMatrixCopy(), new HashSet<>(), expandedClauses,  nodeStateProvider, policy, transaction);
     }
 
     public Row getNode(String node) {
@@ -645,7 +718,7 @@ public class Policy implements MapWriter {
 
     List<Row> getMatrixCopy() {
       return matrix.stream()
-          .map(row -> row.copy(this))
+          .map(row -> row.copy())
           .collect(Collectors.toList());
     }
 
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java
index be2f65d..2e176b5 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java
@@ -121,17 +121,21 @@ public class PolicyHelper {
 
     policyMapping.set(optionalPolicyMapping);
     SessionWrapper sessionWrapper = null;
-    Policy.Session origSession = null;
+
     try {
       try {
         SESSION_WRAPPPER_REF.set(sessionWrapper = getSession(delegatingManager));
       } catch (Exception e) {
         throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "unable to get autoscaling policy session", e);
-
       }
-      origSession = sessionWrapper.session;
+
+      Policy.Session origSession = sessionWrapper.session;
       // new session needs to be created to avoid side-effects from per-collection policies
-      Policy.Session session = new Policy.Session(delegatingManager, origSession.policy, origSession.transaction);
+      // TODO: refactor so cluster state cache is separate from storage of policies to avoid per cluster vs per collection interactions
+      // Need a Session that has all previous history of the original session, NOT filtered by what's present or not in Zookeeper
+      // (as does constructor Session(SolrCloudManager, Policy, Transaction)).
+      Policy.Session newSession = origSession.cloneToNewSession(delegatingManager);
+
       Map<String, Double> diskSpaceReqd = new HashMap<>();
       try {
         DocCollection coll = cloudManager.getClusterStateProvider().getCollection(collName);
@@ -165,7 +169,7 @@ public class PolicyHelper {
         int idx = 0;
         for (Map.Entry<Replica.Type, Integer> e : typeVsCount.entrySet()) {
           for (int i = 0; i < e.getValue(); i++) {
-            Suggester suggester = session.getSuggester(ADDREPLICA)
+            Suggester suggester = newSession.getSuggester(ADDREPLICA)
                 .hint(Hint.REPLICATYPE, e.getKey())
                 .hint(Hint.COLL_SHARD, new Pair<>(collName, shardName));
             if (nodesList != null) {
@@ -184,18 +188,23 @@ public class PolicyHelper {
                   , handleExp(log, "", () -> Utils.writeJson(getDiagnostics(sessionCopy), new StringWriter(), true).toString())); // logOk
 
               throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, " No node can satisfy the rules " +
-                  Utils.toJSONString(Utils.getDeepCopy(session.expandedClauses, 4, true) + " More details from logs in node : "
+                  Utils.toJSONString(Utils.getDeepCopy(newSession.expandedClauses, 4, true) + " More details from logs in node : "
                       + Utils.getMDCNode() + ", errorId : " + errorId));
             }
-            session = suggester.getSession();
+            newSession = suggester.getSession();
             positions.add(new ReplicaPosition(shardName, ++idx, e.getKey(), op.getParams().get(NODE)));
           }
         }
       }
+
+      // We're happy with the updated session based on the original one, so let's update what the wrapper would hand
+      // to the next computation that wants a session.
+      sessionWrapper.update(newSession);
     } finally {
       policyMapping.remove();
+      // We mark the wrapper (and its session) as being available to others.
       if (sessionWrapper != null) {
-        sessionWrapper.returnSession(origSession);
+        sessionWrapper.returnSession();
       }
     }
     return positions;
@@ -600,16 +609,21 @@ public class PolicyHelper {
      */
     public void returnSession(Policy.Session session) {
       this.update(session);
+      this.returnSession();
+    }
+
+    /**
+     * return this for later use without updating the internal Session for cases where it's easier to update separately
+     */
+    public void returnSession() {
       refCount.incrementAndGet();
       ref.returnSession(this);
-
     }
 
     //all ops are executed now it can be destroyed
     public void release() {
       refCount.decrementAndGet();
       ref.release(this);
-
     }
   }
 }
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Row.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Row.java
index 2cc48ea..17b494f 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Row.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Row.java
@@ -27,12 +27,15 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
+import org.apache.solr.client.solrj.cloud.NodeStateProvider;
+import org.apache.solr.client.solrj.cloud.SolrCloudManager;
 import org.apache.solr.common.MapWriter;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.ZkStateReader;
@@ -61,14 +64,27 @@ public class Row implements MapWriter {
   Map perCollCache;
 
   public Row(String node, List<Pair<String, Variable.Type>> params, List<String> perReplicaAttributes, Policy.Session session) {
+    this(node, params, perReplicaAttributes, session, session.nodeStateProvider, session.cloudManager);
+  }
+
+  /**
+   * Constructor that allows explicitly passing a {@link NodeStateProvider} and a {@link SolrCloudManager} in order not to
+   * use those obtained through the passed <code>session</code>.
+   * <p>Note the resulting row has a {@link Policy.Session} that may not be consistent with the rest of the Row's state. When rows are copied
+   * as part of a {@link Policy.Session} copy, the copied rows' sessions are eventually updated in
+   * {@link org.apache.solr.client.solrj.cloud.autoscaling.Policy.Session#Session(List, SolrCloudManager, List, Set, List, NodeStateProvider, Policy, Policy.Transaction)}
+   * once the new {@link Policy.Session} instance is available.</p>
+   */
+  Row(String node, List<Pair<String, Variable.Type>> params, List<String> perReplicaAttributes, Policy.Session session,
+      NodeStateProvider nsp, SolrCloudManager cloudManager) {
     this.session = session;
-    collectionVsShardVsReplicas = session.nodeStateProvider.getReplicaInfo(node, perReplicaAttributes);
+    collectionVsShardVsReplicas = nsp.getReplicaInfo(node, perReplicaAttributes);
     if (collectionVsShardVsReplicas == null) collectionVsShardVsReplicas = new HashMap<>();
     this.node = node;
     cells = new Cell[params.size()];
-    isLive = session.cloudManager.getClusterStateProvider().getLiveNodes().contains(node);
+    isLive = cloudManager.getClusterStateProvider().getLiveNodes().contains(node);
     List<String> paramNames = params.stream().map(Pair::first).collect(Collectors.toList());
-    Map<String, Object> vals = isLive ? session.nodeStateProvider.getNodeValues(node, paramNames) : Collections.emptyMap();
+    Map<String, Object> vals = isLive ? nsp.getNodeValues(node, paramNames) : Collections.emptyMap();
     for (int i = 0; i < params.size(); i++) {
       Pair<String, Variable.Type> pair = params.get(i);
       cells[i] = new Cell(i, pair.first(), Clause.validate(pair.first(), vals.get(pair.first()), false), null, pair.second(), this);
@@ -80,7 +96,6 @@ public class Row implements MapWriter {
     isAlreadyCopied = true;
   }
 
-
   public static final Map<String, CacheEntry> cacheStats = new HashMap<>();
 
   static class CacheEntry implements MapWriter {
@@ -175,7 +190,7 @@ public class Row implements MapWriter {
     ew.put("attributes", Arrays.asList(cells));
   }
 
-  Row copy(Policy.Session session) {
+  Row copy() {
     return new Row(node, cells, anyValueMissing, collectionVsShardVsReplicas, isLive, session, this.globalCache, this.perCollCache);
   }