You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by no...@apache.org on 2021/02/11 13:37:09 UTC
[lucene-solr] branch jira/solr15138_8x updated: porting PRS
improvements from master
This is an automated email from the ASF dual-hosted git repository.
noble pushed a commit to branch jira/solr15138_8x
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/jira/solr15138_8x by this push:
new dcd538d porting PRS improvements from master
dcd538d is described below
commit dcd538daf5f04f40c3e4a52f9dd1a9e5ac1c4880
Author: Noble Paul <no...@gmail.com>
AuthorDate: Fri Feb 12 00:36:34 2021 +1100
porting PRS improvements from master
---
.../src/java/org/apache/solr/cloud/Overseer.java | 24 +++++++
.../solr/cloud/RefreshCollectionMessage.java | 58 ++++++++++++++++
.../cloud/api/collections/CreateCollectionCmd.java | 78 +++++++++++++++++-----
.../apache/solr/common/cloud/PerReplicaStates.java | 13 ++++
4 files changed, 156 insertions(+), 17 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index 3f52f74..b287f0c 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -29,6 +29,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.BiConsumer;
import org.apache.lucene.util.Version;
@@ -95,6 +96,7 @@ public class Overseer implements SolrCloseable {
public static final int NUM_RESPONSES_TO_STORE = 10000;
public static final String OVERSEER_ELECT = "/overseer_elect";
+ private final CopyOnWriteArrayList<Message> unprocessedMessages = new CopyOnWriteArrayList<>();
private SolrMetricsContext solrMetricsContext;
private volatile String metricTag = SolrMetricProducer.getUniqueMetricTag(this, null);
@@ -200,6 +202,13 @@ public class Overseer implements SolrCloseable {
}
// force flush to ZK after each message because there is no fallback if workQueue items
// are removed from workQueue but fail to be written to ZK
+ while (unprocessedMessages.size() > 0) {
+ zkStateWriter.writePendingUpdates();
+ Message m = unprocessedMessages.remove(0);
+ if (m instanceof RefreshCollectionMessage) {
+ clusterState = ((RefreshCollectionMessage) m).run(clusterState, Overseer.this);
+ }
+ }
try {
clusterState = processQueueItem(message, clusterState, zkStateWriter, false, null);
} catch (Exception e) {
@@ -1018,4 +1027,19 @@ public class Overseer implements SolrCloseable {
getStateUpdateQueue().offer(data);
}
+ /**Submit an intra-process message
+ * This will be picked up and executed when clusterstate updater thread runs
+ */
+ public void submit(Message message) {
+ unprocessedMessages.add(message);
+ }
+
+ public interface Message {
+ Operation getOperation();
+
+ enum Operation {
+ REFRESH_COLL;
+ }
+ }
+
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/RefreshCollectionMessage.java b/solr/core/src/java/org/apache/solr/cloud/RefreshCollectionMessage.java
new file mode 100644
index 0000000..0716ad6
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/RefreshCollectionMessage.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cloud;
+
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
+
+/**Refresh the Cluster State for a given collection
+ *
+ */
+public class RefreshCollectionMessage implements Overseer.Message {
+ public final Operation operation;
+ public final String collection;
+
+ public RefreshCollectionMessage(String collection) {
+ this.operation = Operation.REFRESH_COLL;
+ this.collection = collection;
+ }
+
+ ClusterState run(ClusterState clusterState, Overseer overseer) throws InterruptedException, KeeperException {
+ Stat stat = overseer.getZkStateReader().getZkClient().exists(ZkStateReader.getCollectionPath(collection), null, true);
+ if (stat == null) {
+ //collection does not exist
+ return clusterState.copyWith(collection, null);
+ }
+ DocCollection coll = clusterState.getCollectionOrNull(collection);
+ if (coll != null && !coll.isModified(stat.getVersion(), stat.getCversion())) {
+ //our state is up to date
+ return clusterState;
+ } else {
+ coll = ZkStateReader.getCollectionLive(overseer.getZkStateReader(), collection);
+ return clusterState.copyWith(collection, coll);
+ }
+ }
+
+ @Override
+ public Operation getOperation() {
+ return operation;
+ }
+}
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
index a9299cc..31d8675 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
@@ -41,9 +41,12 @@ import org.apache.solr.client.solrj.cloud.autoscaling.NotEmptyException;
import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData;
import org.apache.solr.cloud.Overseer;
+import org.apache.solr.cloud.RefreshCollectionMessage;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.ShardRequestTracker;
import org.apache.solr.cloud.overseer.ClusterStateMutator;
+import org.apache.solr.cloud.overseer.SliceMutator;
+import org.apache.solr.cloud.overseer.ZkWriteCommand;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.Aliases;
@@ -51,6 +54,7 @@ import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.DocRouter;
import org.apache.solr.common.cloud.ImplicitDocRouter;
+import org.apache.solr.common.cloud.PerReplicaStates;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.ReplicaPosition;
import org.apache.solr.common.cloud.ZkConfigManager;
@@ -114,6 +118,7 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
final boolean waitForFinalState = message.getBool(WAIT_FOR_FINAL_STATE, false);
final String alias = message.getStr(ALIAS, collectionName);
log.info("Create collection {}", collectionName);
+ final boolean isPrs = message.getBool(DocCollection.PER_REPLICA_STATE, false);
if (clusterState.hasCollection(collectionName)) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "collection already exists: " + collectionName);
}
@@ -148,6 +153,8 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
// fail fast if parameters are wrong or incomplete
List<String> shardNames = populateShardNames(message, router);
checkReplicaTypes(message);
+ DocCollection newColl = null;
+ final String collectionPath = ZkStateReader.getCollectionPath(collectionName);
AtomicReference<PolicyHelper.SessionWrapper> sessionWrapper = new AtomicReference<>();
@@ -170,27 +177,38 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
}
createCollectionZkNode(stateManager, collectionName, collectionParams);
-
- ocmh.overseer.offerStateUpdate(Utils.toJSON(message));
-
- // wait for a while until we see the collection
- TimeOut waitUntil = new TimeOut(30, TimeUnit.SECONDS, timeSource);
- boolean created = false;
- while (! waitUntil.hasTimedOut()) {
- waitUntil.sleep(100);
- created = ocmh.cloudManager.getClusterStateProvider().getClusterState().hasCollection(collectionName);
- if (created) break;
- }
- if (!created) {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Could not fully create collection: " + collectionName);
+
+ if(isPrs) {
+ ZkWriteCommand command = new ClusterStateMutator(ocmh.cloudManager).createCollection(clusterState, message);
+ byte[] data = Utils.toJSON(Collections.singletonMap(collectionName, command.collection));
+ ocmh.zkStateReader.getZkClient().create(collectionPath, data, CreateMode.PERSISTENT, true);
+ clusterState = clusterState.copyWith(collectionName, command.collection);
+ newColl = command.collection;
+ } else {
+ ocmh.overseer.offerStateUpdate(Utils.toJSON(message));
+
+ // wait for a while until we see the collection
+ TimeOut waitUntil = new TimeOut(30, TimeUnit.SECONDS, timeSource);
+ boolean created = false;
+ while (!waitUntil.hasTimedOut()) {
+ waitUntil.sleep(100);
+ created = ocmh.cloudManager.getClusterStateProvider().getClusterState().hasCollection(collectionName);
+ if (created) break;
+ }
+ if (!created) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Could not fully create collection: " + collectionName);
+ }
+
+ // refresh cluster state
+ clusterState = ocmh.cloudManager.getClusterStateProvider().getClusterState();
+ newColl = clusterState.getCollection(collectionName);
+
}
- // refresh cluster state
- clusterState = ocmh.cloudManager.getClusterStateProvider().getClusterState();
List<ReplicaPosition> replicaPositions = null;
try {
- replicaPositions = buildReplicaPositions(ocmh.cloudManager, clusterState, clusterState.getCollection(collectionName), message, shardNames, sessionWrapper);
+ replicaPositions = buildReplicaPositions(ocmh.cloudManager, clusterState, newColl, message, shardNames, sessionWrapper);
} catch (Assign.AssignmentException e) {
ZkNodeProps deleteMessage = new ZkNodeProps("name", collectionName);
new DeleteCollectionCmd(ocmh).call(clusterState, deleteMessage, results);
@@ -250,7 +268,16 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
ZkStateReader.NODE_NAME_PROP, nodeName,
ZkStateReader.REPLICA_TYPE, replicaPosition.type.name(),
CommonAdminParams.WAIT_FOR_FINAL_STATE, Boolean.toString(waitForFinalState));
- ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
+ if(isPrs) {
+ ZkWriteCommand command = new SliceMutator(ocmh.cloudManager).addReplica(clusterState, props);
+ byte[] data = Utils.toJSON(Collections.singletonMap(collectionName, command.collection));
+// log.info("collection updated : {}", new String(data, StandardCharsets.UTF_8));
+ ocmh.zkStateReader.getZkClient().setData(collectionPath, data, true);
+ clusterState = clusterState.copyWith(collectionName, command.collection);
+ newColl = command.collection;
+ } else {
+ ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
+ }
}
// Need to create new params for each request
@@ -300,6 +327,23 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
shardRequestTracker.processResponses(results, shardHandler, false, null, Collections.emptySet());
@SuppressWarnings({"rawtypes"})
boolean failure = results.get("failure") != null && ((SimpleOrderedMap)results.get("failure")).size() > 0;
+ if(isPrs) {
+ TimeOut timeout = new TimeOut(Integer.getInteger("solr.waitToSeeReplicasInStateTimeoutSeconds", 120), TimeUnit.SECONDS, timeSource); // could be a big cluster
+ PerReplicaStates prs = PerReplicaStates.fetch(collectionPath, ocmh.zkStateReader.getZkClient(), null);
+ while (!timeout.hasTimedOut()) {
+ if(prs.allActive()) break;
+ Thread.sleep(100);
+ prs = PerReplicaStates.fetch(collectionPath, ocmh.zkStateReader.getZkClient(), null);
+ }
+ if (prs.allActive()) {
+ // we have successfully found all replicas to be ACTIVE
+ // Now ask Overseer to fetch the latest state of collection
+ // from ZK
+ ocmh.overseer.submit(new RefreshCollectionMessage(collectionName));
+ } else {
+ failure = true;
+ }
+ }
if (failure) {
// Let's cleanup as we hit an exception
// We shouldn't be passing 'results' here for the cleanup as the response would then contain 'success'
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java b/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java
index af4bb43..be40066 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java
@@ -67,6 +67,8 @@ public class PerReplicaStates implements ReflectMapWriter {
@JsonProperty
public final SimpleMap<State> states;
+ private volatile Boolean allActive;
+
/**
* Construct with data read from ZK
* @param path path from where this is loaded
@@ -92,6 +94,17 @@ public class PerReplicaStates implements ReflectMapWriter {
}
+ /** Check and return if all replicas are ACTIVE
+ */
+ public boolean allActive() {
+ if (this.allActive != null) return allActive;
+ boolean[] result = new boolean[]{true};
+ states.forEachEntry((r, s) -> {
+ if (s.state != Replica.State.ACTIVE) result[0] = false;
+ });
+ return this.allActive = result[0];
+ }
+
/**Get the changed replicas
*/
public static Set<String> findModifiedReplicas(PerReplicaStates old, PerReplicaStates fresh) {