You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by is...@apache.org on 2021/02/10 15:28:33 UTC
[lucene-solr] branch jira/solr15138 updated: SOLR-15138: Refreshing
Overseer's cluster state updater after PRS collections are created directly
without going via queues
This is an automated email from the ASF dual-hosted git repository.
ishan pushed a commit to branch jira/solr15138
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/jira/solr15138 by this push:
new 5aa04dc SOLR-15138: Refreshing Overseer's cluster state updater after PRS collections are created directly without going via queues
5aa04dc is described below
commit 5aa04dce5f99b51ddf72b150ecfb3dc9f10f166c
Author: Ishan Chattopadhyaya <is...@apache.org>
AuthorDate: Wed Feb 10 20:58:09 2021 +0530
SOLR-15138: Refreshing Overseer's cluster state updater after PRS collections are created directly without going via queues
---
solr/core/src/java/org/apache/solr/cloud/Overseer.java | 12 ++++++++++--
.../solr/cloud/api/collections/CreateCollectionCmd.java | 16 ++++++++++++----
.../org/apache/solr/common/cloud/PerReplicaStates.java | 14 +++++++++-----
3 files changed, 31 insertions(+), 11 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index 7b3ba2c..497806f 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -159,7 +159,7 @@ public class Overseer implements SolrCloseable {
*
* <p>The cluster state updater is a single thread dequeueing and executing requests.</p>
*/
- private class ClusterStateUpdater implements Runnable, Closeable {
+ public class ClusterStateUpdater implements Runnable, Closeable {
private final ZkStateReader reader;
private final SolrZkClient zkClient;
@@ -206,6 +206,8 @@ public class Overseer implements SolrCloseable {
return workQueue.getZkStats();
}
+ private boolean refreshClusterState = false;
+
@Override
public void run() {
MDCLoggingContext.setNode(zkController.getNodeName() );
@@ -222,7 +224,8 @@ public class Overseer implements SolrCloseable {
try {
ZkStateWriter zkStateWriter = null;
ClusterState clusterState = null;
- boolean refreshClusterState = true; // let's refresh in the first iteration
+ refreshClusterState = true; // let's refresh in the first iteration
+
// we write updates in batch, but if an exception is thrown when writing new clusterstate,
// we do not sure which message is bad message, therefore we will re-process node one by one
int fallbackQueueSize = Integer.MAX_VALUE;
@@ -355,6 +358,11 @@ public class Overseer implements SolrCloseable {
}
}
+ // nocommit: javadocs
+ public void refreshClusterState() {
+ refreshClusterState = true;
+ }
+
// Return true whenever the exception thrown by ZkStateWriter is correspond
// to a invalid state or 'bad' message (in this case, we should remove that message from queue)
private boolean isBadMessage(Exception e) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
index d5a9112..dc1e674 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
@@ -144,7 +144,7 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
createCollectionZkNode(stateManager, collectionName, collectionParams);
- if(isPrs) {
+ if (isPrs) {
ZkWriteCommand command = new ClusterStateMutator(ocmh.cloudManager).createCollection(clusterState, message);
byte[] data = Utils.toJSON(Collections.singletonMap(collectionName, command.collection));
ocmh.zkStateReader.getZkClient().create(collectionPath, data, CreateMode.PERSISTENT, true);
@@ -218,11 +218,15 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
ZkStateReader.NODE_NAME_PROP, nodeName,
ZkStateReader.REPLICA_TYPE, replicaPosition.type.name(),
CommonAdminParams.WAIT_FOR_FINAL_STATE, Boolean.toString(waitForFinalState));
- if(isPrs) {
+ if (isPrs) {
ZkWriteCommand command = new SliceMutator(ocmh.cloudManager).addReplica(clusterState, props);
byte[] data = Utils.toJSON(Collections.singletonMap(collectionName, command.collection));
-// log.info("collection updated : {}", new String(data, StandardCharsets.UTF_8));
ocmh.zkStateReader.getZkClient().setData(collectionPath, data, true);
+
+ // Since we're directly updating the state here, instead of doing it via a queue in the overseer,
+ // we need to make sure that the cluster state updater used in the Overseer can see this update
+ // upon refreshing itself
+ ((Overseer.ClusterStateUpdater) ocmh.overseer.getUpdaterThread().getThread()).refreshClusterState();
clusterState = clusterState.copyWith(collectionName, command.collection);
newColl = command.collection;
} else {
@@ -299,8 +303,12 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
log.info("Cleaned up artifacts for failed create collection for [{}]", collectionName);
throw new SolrException(ErrorCode.BAD_REQUEST, "Underlying core creation failed while creating collection: " + collectionName);
} else {
-
log.debug("Finished create command on all shards for collection: {}", collectionName);
+ if (isPrs) {
+ // Since we created this collection without some of the sub-operations going through the overseer queues,
+ // we need to make sure that the cluster state in the overseer can see this collection upon refreshing itself
+ ((Overseer.ClusterStateUpdater) ocmh.overseer.getUpdaterThread().getThread()).refreshClusterState();
+ }
// Emit a warning about production use of data driven functionality
boolean defaultConfigSetUsed = message.getStr(COLL_CONF) == null ||
message.getStr(COLL_CONF).equals(DEFAULT_CONFIGSET_NAME);
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java b/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java
index be40066..71c8c89 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java
@@ -27,6 +27,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import org.apache.solr.cluster.api.SimpleMap;
@@ -67,7 +68,7 @@ public class PerReplicaStates implements ReflectMapWriter {
@JsonProperty
public final SimpleMap<State> states;
- private volatile Boolean allActive;
+ private volatile AtomicBoolean allActive = null;
/**
* Construct with data read from ZK
@@ -94,18 +95,21 @@ public class PerReplicaStates implements ReflectMapWriter {
}
- /** Check and return if all replicas are ACTIVE
+ /**
+ * Check and return if all replicas are ACTIVE
*/
public boolean allActive() {
- if (this.allActive != null) return allActive;
+ if (this.allActive != null) return allActive.get();
boolean[] result = new boolean[]{true};
states.forEachEntry((r, s) -> {
if (s.state != Replica.State.ACTIVE) result[0] = false;
});
- return this.allActive = result[0];
+ this.allActive.set(result[0]);
+ return this.allActive.get();
}
- /**Get the changed replicas
+ /**
+ * Get the changed replicas
*/
public static Set<String> findModifiedReplicas(PerReplicaStates old, PerReplicaStates fresh) {
Set<String> result = new HashSet<>();