You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by is...@apache.org on 2021/02/12 06:01:36 UTC
[lucene-solr] branch jira/solr15138_8x updated: General cleanup,
address review comment
This is an automated email from the ASF dual-hosted git repository.
ishan pushed a commit to branch jira/solr15138_8x
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/jira/solr15138_8x by this push:
new 2a2bba1 General cleanup, address review comment
2a2bba1 is described below
commit 2a2bba1c4d1fdbf78f0b671cf4b92bfeb476fff0
Author: Ishan Chattopadhyaya <is...@apache.org>
AuthorDate: Fri Feb 12 11:31:03 2021 +0530
General cleanup, address review comment
---
solr/core/src/java/org/apache/solr/cloud/Overseer.java | 5 +++--
.../org/apache/solr/cloud/RefreshCollectionMessage.java | 8 ++++----
.../solr/cloud/api/collections/CreateCollectionCmd.java | 17 +++++++++++------
.../org/apache/solr/common/cloud/PerReplicaStates.java | 8 +++++---
.../org/apache/solr/common/cloud/ZkStateReader.java | 6 +++---
.../solr/client/solrj/impl/CloudSolrClientTest.java | 2 +-
6 files changed, 27 insertions(+), 19 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index 358ae71..a10e8a6 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -1027,8 +1027,9 @@ public class Overseer implements SolrCloseable {
getStateUpdateQueue().offer(data);
}
- /**Submit an intra-process message
- * This will be picked up and executed when clusterstate updater thread runs
+ /**
+ * Submit an intra-process message which will be picked up and executed when {@link ClusterStateUpdater}'s
+ * loop runs next time
*/
public void submit(Message message) {
unprocessedMessages.add(message);
diff --git a/solr/core/src/java/org/apache/solr/cloud/RefreshCollectionMessage.java b/solr/core/src/java/org/apache/solr/cloud/RefreshCollectionMessage.java
index 0716ad6..5375322 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RefreshCollectionMessage.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RefreshCollectionMessage.java
@@ -23,8 +23,8 @@ import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
-/**Refresh the Cluster State for a given collection
- *
+/**
+ * Refresh the ClusterState for a given collection
*/
public class RefreshCollectionMessage implements Overseer.Message {
public final Operation operation;
@@ -38,12 +38,12 @@ public class RefreshCollectionMessage implements Overseer.Message {
ClusterState run(ClusterState clusterState, Overseer overseer) throws InterruptedException, KeeperException {
Stat stat = overseer.getZkStateReader().getZkClient().exists(ZkStateReader.getCollectionPath(collection), null, true);
if (stat == null) {
- //collection does not exist
+ // collection does not exist
return clusterState.copyWith(collection, null);
}
DocCollection coll = clusterState.getCollectionOrNull(collection);
if (coll != null && !coll.isModified(stat.getVersion(), stat.getCversion())) {
- //our state is up to date
+ // our state is up to date
return clusterState;
} else {
coll = ZkStateReader.getCollectionLive(overseer.getZkStateReader(), collection);
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
index c430c0d..6c1d51c 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
@@ -119,7 +119,7 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
final boolean waitForFinalState = message.getBool(WAIT_FOR_FINAL_STATE, false);
final String alias = message.getStr(ALIAS, collectionName);
log.info("Create collection {}", collectionName);
- final boolean isPrs = message.getBool(DocCollection.PER_REPLICA_STATE, false);
+ final boolean isPRS = message.getBool(DocCollection.PER_REPLICA_STATE, false);
if (clusterState.hasCollection(collectionName)) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "collection already exists: " + collectionName);
}
@@ -179,7 +179,10 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
createCollectionZkNode(stateManager, collectionName, collectionParams);
- if(isPrs) {
+ if (isPRS) {
+ // In case of a PRS collection, create the collection structure directly instead of resubmitting
+ // to the overseer queue.
+ // TODO: Consider doing this for all collections, not just the PRS collections.
ZkWriteCommand command = new ClusterStateMutator(ocmh.cloudManager).createCollection(clusterState, message);
byte[] data = Utils.toJSON(Collections.singletonMap(collectionName, command.collection));
ocmh.zkStateReader.getZkClient().create(collectionPath, data, CreateMode.PERSISTENT, true);
@@ -269,10 +272,12 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
ZkStateReader.NODE_NAME_PROP, nodeName,
ZkStateReader.REPLICA_TYPE, replicaPosition.type.name(),
CommonAdminParams.WAIT_FOR_FINAL_STATE, Boolean.toString(waitForFinalState));
- if(isPrs) {
+ if (isPRS) {
+ // In case of a PRS collection, execute the ADDREPLICA directly instead of resubmitting
+ // to the overseer queue.
+ // TODO: Consider doing this for all collections, not just the PRS collections.
ZkWriteCommand command = new SliceMutator(ocmh.cloudManager).addReplica(clusterState, props);
byte[] data = Utils.toJSON(Collections.singletonMap(collectionName, command.collection));
-// log.info("collection updated : {}", new String(data, StandardCharsets.UTF_8));
ocmh.zkStateReader.getZkClient().setData(collectionPath, data, true);
clusterState = clusterState.copyWith(collectionName, command.collection);
newColl = command.collection;
@@ -318,7 +323,7 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
if(!isLegacyCloud) {
// wait for all replica entries to be created
Map<String, Replica> replicas ;
- if(isPrs) {
+ if (isPRS) {
replicas = new ConcurrentHashMap<>();
newColl.getSlices().stream().flatMap(slice -> slice.getReplicas().stream())
.filter(r -> coresToCreate.containsKey(r.getCoreName())) // Only the elements that were asked for...
@@ -337,7 +342,7 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
shardRequestTracker.processResponses(results, shardHandler, false, null, Collections.emptySet());
@SuppressWarnings({"rawtypes"})
boolean failure = results.get("failure") != null && ((SimpleOrderedMap)results.get("failure")).size() > 0;
- if(isPrs) {
+ if (isPRS) {
TimeOut timeout = new TimeOut(Integer.getInteger("solr.waitToSeeReplicasInStateTimeoutSeconds", 120), TimeUnit.SECONDS, timeSource); // could be a big cluster
PerReplicaStates prs = PerReplicaStates.fetch(collectionPath, ocmh.zkStateReader.getZkClient(), null);
while (!timeout.hasTimedOut()) {
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java b/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java
index be40066..4fdfc32 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java
@@ -27,6 +27,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import org.apache.solr.cluster.api.SimpleMap;
@@ -67,7 +68,7 @@ public class PerReplicaStates implements ReflectMapWriter {
@JsonProperty
public final SimpleMap<State> states;
- private volatile Boolean allActive;
+ private volatile AtomicBoolean allActive;
/**
* Construct with data read from ZK
@@ -97,12 +98,13 @@ public class PerReplicaStates implements ReflectMapWriter {
/** Check and return if all replicas are ACTIVE
*/
public boolean allActive() {
- if (this.allActive != null) return allActive;
+ if (this.allActive != null) return allActive.get();
boolean[] result = new boolean[]{true};
states.forEachEntry((r, s) -> {
if (s.state != Replica.State.ACTIVE) result[0] = false;
});
- return this.allActive = result[0];
+ this.allActive.set(result[0]);
+ return this.allActive.get();
}
/**Get the changed replicas
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index d9e831e..3ac45b4 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -1400,7 +1400,7 @@ public class ZkStateReader implements SolrCloseable {
}
} catch (NoNodeException e) {
- log.info("{} is deleted, stop watching children", collectionPath);
+ log.debug("{} is deleted, stop watching children", collectionPath);
}
}
}
@@ -1620,7 +1620,7 @@ public class ZkStateReader implements SolrCloseable {
ClusterState.initReplicaStateProvider(() -> {
try {
PerReplicaStates replicaStates = PerReplicaStates.fetch(collectionPath, zkClient, null);
- log.info("per-replica-state ver: {} fetched for initializing {} ", replicaStates.cversion, collectionPath);
+ log.debug("per-replica-state ver: {} fetched for initializing {} ", replicaStates.cversion, collectionPath);
return replicaStates;
} catch (Exception e) {
throw new SolrException(ErrorCode.SERVER_ERROR, "Error fetching per-replica-states");
@@ -1764,7 +1764,7 @@ public class ZkStateReader implements SolrCloseable {
v = new CollectionWatch<>();
watchSet.set(true);
}
- log.info("already watching , added to stateWatchers");
+ log.debug("already watching , added to stateWatchers");
v.stateWatchers.add(stateWatcher);
return v;
});
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java
index e6920bf..b85499d 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java
@@ -1100,7 +1100,7 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
PerReplicaStates prs = PerReplicaStates.fetch(ZkStateReader.getCollectionPath(testCollection), cluster.getZkClient(), null);
assertEquals(4, prs.states.size());
- //Now let's do an add replica
+ // Now let's do an add replica
CollectionAdminRequest
.addReplicaToShard(testCollection, "shard1")
.process(cluster.getSolrClient());