You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by no...@apache.org on 2020/06/10 11:14:21 UTC
[lucene-solr] branch master updated: SOLR-14347: fix cached session
update to not depend on Zookeeper state (#1542)
This is an automated email from the ASF dual-hosted git repository.
noble pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/master by this push:
new d4f7c90 SOLR-14347: fix cached session update to not depend on Zookeeper state (#1542)
d4f7c90 is described below
commit d4f7c90bab3d3dd76c066aeb598544cf8250bac8
Author: murblanc <43...@users.noreply.github.com>
AuthorDate: Wed Jun 10 13:13:55 2020 +0200
SOLR-14347: fix cached session update to not depend on Zookeeper state (#1542)
SOLR-14347: fix cached session update to not depend on Zookeeper state
---
.../client/solrj/cloud/autoscaling/Policy.java | 89 ++++++++++++++++++++--
.../solrj/cloud/autoscaling/PolicyHelper.java | 34 ++++++---
.../solr/client/solrj/cloud/autoscaling/Row.java | 25 ++++--
3 files changed, 125 insertions(+), 23 deletions(-)
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java
index 0f17306..164f827 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Policy.java
@@ -382,7 +382,7 @@ public class Policy implements MapWriter {
}
public Session createSession(SolrCloudManager cloudManager) {
- return new Session(cloudManager, this, null);
+ return createSession(cloudManager, null);
}
public Session createSession(SolrCloudManager cloudManager, Transaction tx) {
@@ -550,14 +550,18 @@ public class Policy implements MapWriter {
final SolrCloudManager cloudManager;
final List<Row> matrix;
final NodeStateProvider nodeStateProvider;
- Set<String> collections = new HashSet<>();
+ final Set<String> collections;
final Policy policy;
List<Clause> expandedClauses;
List<Violation> violations = new ArrayList<>();
Transaction transaction;
+ /**
+ * This constructor creates a Session from the current Zookeeper collection, replica and node states.
+ */
Session(SolrCloudManager cloudManager, Policy policy, Transaction transaction) {
+ collections = new HashSet<>();
this.transaction = transaction;
this.policy = policy;
ClusterState state = null;
@@ -605,37 +609,106 @@ public class Policy implements MapWriter {
applyRules();
}
+ /**
+ * Creates a new Session and updates the Rows in the internal matrix to reference this session.
+ */
private Session(List<String> nodes, SolrCloudManager cloudManager,
- List<Row> matrix, List<Clause> expandedClauses,
+ List<Row> matrix, Set<String> collections, List<Clause> expandedClauses,
NodeStateProvider nodeStateProvider, Policy policy, Transaction transaction) {
this.transaction = transaction;
this.policy = policy;
this.nodes = nodes;
this.cloudManager = cloudManager;
+ this.collections = collections;
this.matrix = matrix;
this.expandedClauses = expandedClauses;
this.nodeStateProvider = nodeStateProvider;
for (Row row : matrix) row.session = this;
}
+ /**
+ * Given a session (this one), creates a new one for placement simulations that retains all the relevant information,
+ * whether or not that info already made it to Zookeeper.
+ */
+ public Session cloneToNewSession(SolrCloudManager cloudManager) {
+ NodeStateProvider nodeStateProvider = cloudManager.getNodeStateProvider();
+ ClusterStateProvider clusterStateProvider = cloudManager.getClusterStateProvider();
+
+ List<String> nodes = new ArrayList<>(clusterStateProvider.getLiveNodes());
+
+ // Copy all collections from old session, even those not yet in ZK state
+ Set<String> collections = new HashSet<>(this.collections);
+
+ // (shallow) copy the expanded clauses
+ List<Clause> expandedClauses = new ArrayList<>(this.expandedClauses);
+
+ List<Row> matrix = new ArrayList<>(nodes.size());
+ Map<String, Row> copyNodes = new HashMap<>();
+ for (Row oldRow: this.matrix) {
+ copyNodes.put(oldRow.node, oldRow.copy());
+ }
+ for (String node : nodes) {
+ // Do we have a row for that node in this session? If yes, reuse without trying to fetch from cluster state (latest changes might not be there)
+ Row newRow = copyNodes.get(node);
+ if (newRow == null) {
+ // Dealing with a node that doesn't exist in this Session. Need to create related data from scratch.
+ // We pass null for the Session in purpose. The current (this) session in not the correct one for this Row.
+ // The correct session will be set when we build the new Session instance at the end of this method.
+ newRow = new Row(node, this.policy.getParams(), this.policy.getPerReplicaAttributes(), null, nodeStateProvider, cloudManager);
+ // Get info for collections on that node
+ Set<String> collectionsOnNewNode = nodeStateProvider.getReplicaInfo(node, Collections.emptyList()).keySet();
+ collections.addAll(collectionsOnNewNode);
+
+ // Adjust policies to take into account new collections
+ for (String collection : collectionsOnNewNode) {
+ // We pass this.policy but it is not modified so will not impact this session being cloned
+ addClausesForCollection(this.policy, expandedClauses, clusterStateProvider, collection);
+ }
+ }
+ matrix.add(newRow);
+ }
+
+ if (nodes.size() > 0) {
+ //if any collection has 'withCollection' irrespective of the node, the NodeStateProvider returns a map value
+ Map<String, Object> vals = nodeStateProvider.getNodeValues(nodes.get(0), Collections.singleton("withCollection"));
+ if (!vals.isEmpty() && vals.get("withCollection") != null) {
+ Map<String, String> withCollMap = (Map<String, String>) vals.get("withCollection");
+ if (!withCollMap.isEmpty()) {
+ Clause withCollClause = new Clause((Map<String,Object>)Utils.fromJSONString("{withCollection:'*' , node: '#ANY'}") ,
+ new Condition(NODE.tagName, "#ANY", Operand.EQUAL, null, null),
+ new Condition(WITH_COLLECTION.tagName,"*" , Operand.EQUAL, null, null), true, null, false
+ );
+ expandedClauses.add(withCollClause);
+ }
+ }
+ }
+
+ Collections.sort(expandedClauses);
+
+ Session newSession = new Session(nodes, cloudManager, matrix, collections, expandedClauses,
+ nodeStateProvider, this.policy, this.transaction);
+ newSession.applyRules();
+
+ return newSession;
+ }
void addClausesForCollection(ClusterStateProvider stateProvider, String collection) {
addClausesForCollection(policy, expandedClauses, stateProvider, collection);
}
- public static void addClausesForCollection(Policy policy, List<Clause> clauses, ClusterStateProvider stateProvider, String c) {
- String p = stateProvider.getPolicyNameByCollection(c);
+ public static void addClausesForCollection(Policy policy, List<Clause> clauses, ClusterStateProvider stateProvider, String collectionName) {
+ String p = stateProvider.getPolicyNameByCollection(collectionName);
if (p != null) {
List<Clause> perCollPolicy = policy.getPolicies().get(p);
if (perCollPolicy == null) {
return;
}
}
- clauses.addAll(mergePolicies(c, policy.getPolicies().getOrDefault(p, emptyList()), policy.getClusterPolicy()));
+ clauses.addAll(mergePolicies(collectionName, policy.getPolicies().getOrDefault(p, emptyList()), policy.getClusterPolicy()));
}
Session copy() {
- return new Session(nodes, cloudManager, getMatrixCopy(), expandedClauses, nodeStateProvider, policy, transaction);
+ return new Session(nodes, cloudManager, getMatrixCopy(), new HashSet<>(), expandedClauses, nodeStateProvider, policy, transaction);
}
public Row getNode(String node) {
@@ -645,7 +718,7 @@ public class Policy implements MapWriter {
List<Row> getMatrixCopy() {
return matrix.stream()
- .map(row -> row.copy(this))
+ .map(row -> row.copy())
.collect(Collectors.toList());
}
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java
index be2f65d..2e176b5 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java
@@ -121,17 +121,21 @@ public class PolicyHelper {
policyMapping.set(optionalPolicyMapping);
SessionWrapper sessionWrapper = null;
- Policy.Session origSession = null;
+
try {
try {
SESSION_WRAPPPER_REF.set(sessionWrapper = getSession(delegatingManager));
} catch (Exception e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "unable to get autoscaling policy session", e);
-
}
- origSession = sessionWrapper.session;
+
+ Policy.Session origSession = sessionWrapper.session;
// new session needs to be created to avoid side-effects from per-collection policies
- Policy.Session session = new Policy.Session(delegatingManager, origSession.policy, origSession.transaction);
+ // TODO: refactor so cluster state cache is separate from storage of policies to avoid per cluster vs per collection interactions
+ // Need a Session that has all previous history of the original session, NOT filtered by what's present or not in Zookeeper
+ // (as does constructor Session(SolrCloudManager, Policy, Transaction)).
+ Policy.Session newSession = origSession.cloneToNewSession(delegatingManager);
+
Map<String, Double> diskSpaceReqd = new HashMap<>();
try {
DocCollection coll = cloudManager.getClusterStateProvider().getCollection(collName);
@@ -165,7 +169,7 @@ public class PolicyHelper {
int idx = 0;
for (Map.Entry<Replica.Type, Integer> e : typeVsCount.entrySet()) {
for (int i = 0; i < e.getValue(); i++) {
- Suggester suggester = session.getSuggester(ADDREPLICA)
+ Suggester suggester = newSession.getSuggester(ADDREPLICA)
.hint(Hint.REPLICATYPE, e.getKey())
.hint(Hint.COLL_SHARD, new Pair<>(collName, shardName));
if (nodesList != null) {
@@ -184,18 +188,23 @@ public class PolicyHelper {
, handleExp(log, "", () -> Utils.writeJson(getDiagnostics(sessionCopy), new StringWriter(), true).toString())); // logOk
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, " No node can satisfy the rules " +
- Utils.toJSONString(Utils.getDeepCopy(session.expandedClauses, 4, true) + " More details from logs in node : "
+ Utils.toJSONString(Utils.getDeepCopy(newSession.expandedClauses, 4, true) + " More details from logs in node : "
+ Utils.getMDCNode() + ", errorId : " + errorId));
}
- session = suggester.getSession();
+ newSession = suggester.getSession();
positions.add(new ReplicaPosition(shardName, ++idx, e.getKey(), op.getParams().get(NODE)));
}
}
}
+
+ // We're happy with the updated session based on the original one, so let's update what the wrapper would hand
+ // to the next computation that wants a session.
+ sessionWrapper.update(newSession);
} finally {
policyMapping.remove();
+ // We mark the wrapper (and its session) as being available to others.
if (sessionWrapper != null) {
- sessionWrapper.returnSession(origSession);
+ sessionWrapper.returnSession();
}
}
return positions;
@@ -600,16 +609,21 @@ public class PolicyHelper {
*/
public void returnSession(Policy.Session session) {
this.update(session);
+ this.returnSession();
+ }
+
+ /**
+ * return this for later use without updating the internal Session for cases where it's easier to update separately
+ */
+ public void returnSession() {
refCount.incrementAndGet();
ref.returnSession(this);
-
}
//all ops are executed now it can be destroyed
public void release() {
refCount.decrementAndGet();
ref.release(this);
-
}
}
}
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Row.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Row.java
index 2cc48ea..17b494f 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Row.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/Row.java
@@ -27,12 +27,15 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
+import org.apache.solr.client.solrj.cloud.NodeStateProvider;
+import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.common.MapWriter;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.ZkStateReader;
@@ -61,14 +64,27 @@ public class Row implements MapWriter {
Map perCollCache;
public Row(String node, List<Pair<String, Variable.Type>> params, List<String> perReplicaAttributes, Policy.Session session) {
+ this(node, params, perReplicaAttributes, session, session.nodeStateProvider, session.cloudManager);
+ }
+
+ /**
+ * Constructor that allows explicitly passing a {@link NodeStateProvider} and a {@link SolrCloudManager} in order not to
+ * use those obtained through the passed <code>session</code>.
+ * <p>Note the resulting row has a {@link Policy.Session} that may not be consistent with the rest of the Row's state. When rows are copied
+ * as part of a {@link Policy.Session} copy, the copied rows' sessions are eventually updated in
+ * {@link org.apache.solr.client.solrj.cloud.autoscaling.Policy.Session#Session(List, SolrCloudManager, List, Set, List, NodeStateProvider, Policy, Policy.Transaction)}
+ * once the new {@link Policy.Session} instance is available.</p>
+ */
+ Row(String node, List<Pair<String, Variable.Type>> params, List<String> perReplicaAttributes, Policy.Session session,
+ NodeStateProvider nsp, SolrCloudManager cloudManager) {
this.session = session;
- collectionVsShardVsReplicas = session.nodeStateProvider.getReplicaInfo(node, perReplicaAttributes);
+ collectionVsShardVsReplicas = nsp.getReplicaInfo(node, perReplicaAttributes);
if (collectionVsShardVsReplicas == null) collectionVsShardVsReplicas = new HashMap<>();
this.node = node;
cells = new Cell[params.size()];
- isLive = session.cloudManager.getClusterStateProvider().getLiveNodes().contains(node);
+ isLive = cloudManager.getClusterStateProvider().getLiveNodes().contains(node);
List<String> paramNames = params.stream().map(Pair::first).collect(Collectors.toList());
- Map<String, Object> vals = isLive ? session.nodeStateProvider.getNodeValues(node, paramNames) : Collections.emptyMap();
+ Map<String, Object> vals = isLive ? nsp.getNodeValues(node, paramNames) : Collections.emptyMap();
for (int i = 0; i < params.size(); i++) {
Pair<String, Variable.Type> pair = params.get(i);
cells[i] = new Cell(i, pair.first(), Clause.validate(pair.first(), vals.get(pair.first()), false), null, pair.second(), this);
@@ -80,7 +96,6 @@ public class Row implements MapWriter {
isAlreadyCopied = true;
}
-
public static final Map<String, CacheEntry> cacheStats = new HashMap<>();
static class CacheEntry implements MapWriter {
@@ -175,7 +190,7 @@ public class Row implements MapWriter {
ew.put("attributes", Arrays.asList(cells));
}
- Row copy(Policy.Session session) {
+ Row copy() {
return new Row(node, cells, anyValueMissing, collectionVsShardVsReplicas, isLive, session, this.globalCache, this.perCollCache);
}