You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by is...@apache.org on 2021/02/13 03:18:35 UTC
[lucene-solr] branch branch_8_8 updated: Revert "SOLR-15138:
Collection creation for PerReplicaStates does not scale to large
collections as well as regular collections (closes #2359 and #2318)"
This is an automated email from the ASF dual-hosted git repository.
ishan pushed a commit to branch branch_8_8
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/branch_8_8 by this push:
new 5f065ac Revert "SOLR-15138: Collection creation for PerReplicaStates does not scale to large collections as well as regular collections (closes #2359 and #2318)"
5f065ac is described below
commit 5f065acfbdb10e35633859520bb59122f4809f0b
Author: Ishan Chattopadhyaya <is...@apache.org>
AuthorDate: Sat Feb 13 08:48:05 2021 +0530
Revert "SOLR-15138: Collection creation for PerReplicaStates does not scale to large collections as well as regular collections (closes #2359 and #2318)"
This reverts commit 22c716bcd946fa2d49e6cea53c0f0dd689954d76.
---
solr/CHANGES.txt | 3 -
.../src/java/org/apache/solr/cloud/Overseer.java | 24 +-----
.../solr/cloud/RefreshCollectionMessage.java | 51 ------------
.../cloud/api/collections/CreateCollectionCmd.java | 97 +++++-----------------
.../apache/solr/common/cloud/PerReplicaStates.java | 14 ----
.../solr/common/cloud/PerReplicaStatesOps.java | 18 ++--
.../client/solrj/impl/CloudSolrClientTest.java | 8 --
7 files changed, 30 insertions(+), 185 deletions(-)
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 7831efd..1fe80b6 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -17,9 +17,6 @@ Bug Fixes
* SOLR-15136: Reduce excessive logging introduced with Per Replica States feature (Ishan Chattopadhyaya)
-* SOLR-15138: Collection creation for PerReplicaStates does not scale to large collections as well as regular collections
- (Mike Drob, Ilan Ginzburg, noble, Ishan Chattopadhyaya)
-
================== 8.8.0 ==================
Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release.
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index 4ca5703..3f52f74 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -29,7 +29,6 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.BiConsumer;
import org.apache.lucene.util.Version;
@@ -96,7 +95,6 @@ public class Overseer implements SolrCloseable {
public static final int NUM_RESPONSES_TO_STORE = 10000;
public static final String OVERSEER_ELECT = "/overseer_elect";
- private final CopyOnWriteArrayList<Message> unprocessedMessages = new CopyOnWriteArrayList<>();
private SolrMetricsContext solrMetricsContext;
private volatile String metricTag = SolrMetricProducer.getUniqueMetricTag(this, null);
@@ -200,6 +198,8 @@ public class Overseer implements SolrCloseable {
if (log.isDebugEnabled()) {
log.debug("processMessage: fallbackQueueSize: {}, message = {}", fallbackQueue.getZkStats().getQueueLength(), message);
}
+ // force flush to ZK after each message because there is no fallback if workQueue items
+ // are removed from workQueue but fail to be written to ZK
try {
clusterState = processQueueItem(message, clusterState, zkStateWriter, false, null);
} catch (Exception e) {
@@ -260,13 +260,6 @@ public class Overseer implements SolrCloseable {
processedNodes.add(head.first());
fallbackQueueSize = processedNodes.size();
- // force flush to ZK after each message because there is no fallback if workQueue items
- // are removed from workQueue but fail to be written to ZK
- while (unprocessedMessages.size() > 0) {
- clusterState = zkStateWriter.writePendingUpdates();
- Message m = unprocessedMessages.remove(0);
- clusterState = m.run(clusterState, Overseer.this);
- }
// The callback always be called on this thread
clusterState = processQueueItem(message, clusterState, zkStateWriter, true, () -> {
stateUpdateQueue.remove(processedNodes);
@@ -1025,17 +1018,4 @@ public class Overseer implements SolrCloseable {
getStateUpdateQueue().offer(data);
}
- /**
- * Submit an intra-process message which will be picked up and executed when {@link ClusterStateUpdater}'s
- * loop runs next time
- */
- public void submit(Message message) {
- unprocessedMessages.add(message);
- }
-
- public interface Message {
- ClusterState run(ClusterState clusterState, Overseer overseer) throws Exception;
-
- }
-
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/RefreshCollectionMessage.java b/solr/core/src/java/org/apache/solr/cloud/RefreshCollectionMessage.java
deleted file mode 100644
index 2f221af..0000000
--- a/solr/core/src/java/org/apache/solr/cloud/RefreshCollectionMessage.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.solr.cloud;
-
-import org.apache.solr.common.cloud.ClusterState;
-import org.apache.solr.common.cloud.DocCollection;
-import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.zookeeper.data.Stat;
-
-/**
- * Refresh the Cluster State for a given collection
- *
- */
-public class RefreshCollectionMessage implements Overseer.Message {
- public final String collection;
-
- public RefreshCollectionMessage(String collection) {
- this.collection = collection;
- }
-
- public ClusterState run(ClusterState clusterState, Overseer overseer) throws Exception {
- Stat stat = overseer.getZkStateReader().getZkClient().exists(ZkStateReader.getCollectionPath(collection), null, true);
- if (stat == null) {
- //collection does not exist
- return clusterState.copyWith(collection, null);
- }
- DocCollection coll = clusterState.getCollectionOrNull(collection);
- if (coll != null && !coll.isModified(stat.getVersion(), stat.getCversion())) {
- //our state is up to date
- return clusterState;
- } else {
- coll = ZkStateReader.getCollectionLive(overseer.getZkStateReader(), collection);
- return clusterState.copyWith(collection, coll);
- }
- }
-}
diff --git a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
index 6d9e67e..a9299cc 100644
--- a/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
+++ b/solr/core/src/java/org/apache/solr/cloud/api/collections/CreateCollectionCmd.java
@@ -29,7 +29,6 @@ import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Properties;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
@@ -42,12 +41,9 @@ import org.apache.solr.client.solrj.cloud.autoscaling.NotEmptyException;
import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData;
import org.apache.solr.cloud.Overseer;
-import org.apache.solr.cloud.RefreshCollectionMessage;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.ShardRequestTracker;
import org.apache.solr.cloud.overseer.ClusterStateMutator;
-import org.apache.solr.cloud.overseer.SliceMutator;
-import org.apache.solr.cloud.overseer.ZkWriteCommand;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.Aliases;
@@ -55,7 +51,6 @@ import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.DocRouter;
import org.apache.solr.common.cloud.ImplicitDocRouter;
-import org.apache.solr.common.cloud.PerReplicaStates;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.ReplicaPosition;
import org.apache.solr.common.cloud.ZkConfigManager;
@@ -119,7 +114,6 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
final boolean waitForFinalState = message.getBool(WAIT_FOR_FINAL_STATE, false);
final String alias = message.getStr(ALIAS, collectionName);
log.info("Create collection {}", collectionName);
- final boolean isPRS = message.getBool(DocCollection.PER_REPLICA_STATE, false);
if (clusterState.hasCollection(collectionName)) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "collection already exists: " + collectionName);
}
@@ -154,8 +148,6 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
// fail fast if parameters are wrong or incomplete
List<String> shardNames = populateShardNames(message, router);
checkReplicaTypes(message);
- DocCollection newColl = null;
- final String collectionPath = ZkStateReader.getCollectionPath(collectionName);
AtomicReference<PolicyHelper.SessionWrapper> sessionWrapper = new AtomicReference<>();
@@ -178,41 +170,27 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
}
createCollectionZkNode(stateManager, collectionName, collectionParams);
-
- if (isPRS) {
- // In case of a PRS collection, create the collection structure directly instead of resubmitting
- // to the overseer queue.
- // TODO: Consider doing this for all collections, not just the PRS collections.
- ZkWriteCommand command = new ClusterStateMutator(ocmh.cloudManager).createCollection(clusterState, message);
- byte[] data = Utils.toJSON(Collections.singletonMap(collectionName, command.collection));
- ocmh.zkStateReader.getZkClient().create(collectionPath, data, CreateMode.PERSISTENT, true);
- clusterState = clusterState.copyWith(collectionName, command.collection);
- newColl = command.collection;
- } else {
- ocmh.overseer.offerStateUpdate(Utils.toJSON(message));
-
- // wait for a while until we see the collection
- TimeOut waitUntil = new TimeOut(30, TimeUnit.SECONDS, timeSource);
- boolean created = false;
- while (!waitUntil.hasTimedOut()) {
- waitUntil.sleep(100);
- created = ocmh.cloudManager.getClusterStateProvider().getClusterState().hasCollection(collectionName);
- if (created) break;
- }
- if (!created) {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Could not fully create collection: " + collectionName);
- }
-
- // refresh cluster state
- clusterState = ocmh.cloudManager.getClusterStateProvider().getClusterState();
- newColl = clusterState.getCollection(collectionName);
-
+
+ ocmh.overseer.offerStateUpdate(Utils.toJSON(message));
+
+ // wait for a while until we see the collection
+ TimeOut waitUntil = new TimeOut(30, TimeUnit.SECONDS, timeSource);
+ boolean created = false;
+ while (! waitUntil.hasTimedOut()) {
+ waitUntil.sleep(100);
+ created = ocmh.cloudManager.getClusterStateProvider().getClusterState().hasCollection(collectionName);
+ if (created) break;
+ }
+ if (!created) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Could not fully create collection: " + collectionName);
}
+ // refresh cluster state
+ clusterState = ocmh.cloudManager.getClusterStateProvider().getClusterState();
List<ReplicaPosition> replicaPositions = null;
try {
- replicaPositions = buildReplicaPositions(ocmh.cloudManager, clusterState, newColl, message, shardNames, sessionWrapper);
+ replicaPositions = buildReplicaPositions(ocmh.cloudManager, clusterState, clusterState.getCollection(collectionName), message, shardNames, sessionWrapper);
} catch (Assign.AssignmentException e) {
ZkNodeProps deleteMessage = new ZkNodeProps("name", collectionName);
new DeleteCollectionCmd(ocmh).call(clusterState, deleteMessage, results);
@@ -252,7 +230,7 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
}
String coreName = Assign.buildSolrCoreName(ocmh.cloudManager.getDistribStateManager(),
- newColl,
+ ocmh.cloudManager.getClusterStateProvider().getClusterState().getCollection(collectionName),
replicaPosition.shard, replicaPosition.type, true);
if (log.isDebugEnabled()) {
log.debug(formatString("Creating core {0} as part of shard {1} of collection {2} on {3}"
@@ -272,18 +250,7 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
ZkStateReader.NODE_NAME_PROP, nodeName,
ZkStateReader.REPLICA_TYPE, replicaPosition.type.name(),
CommonAdminParams.WAIT_FOR_FINAL_STATE, Boolean.toString(waitForFinalState));
- if (isPRS) {
- // In case of a PRS collection, execute the ADDREPLICA directly instead of resubmitting
- // to the overseer queue.
- // TODO: Consider doing this for all collections, not just the PRS collections.
- ZkWriteCommand command = new SliceMutator(ocmh.cloudManager).addReplica(clusterState, props);
- byte[] data = Utils.toJSON(Collections.singletonMap(collectionName, command.collection));
- ocmh.zkStateReader.getZkClient().setData(collectionPath, data, true);
- clusterState = clusterState.copyWith(collectionName, command.collection);
- newColl = command.collection;
- } else {
- ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
- }
+ ocmh.overseer.offerStateUpdate(Utils.toJSON(props));
}
// Need to create new params for each request
@@ -322,16 +289,7 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
if(!isLegacyCloud) {
// wait for all replica entries to be created
- Map<String, Replica> replicas ;
- if (isPRS) {
- replicas = new ConcurrentHashMap<>();
- newColl.getSlices().stream().flatMap(slice -> slice.getReplicas().stream())
- .filter(r -> coresToCreate.containsKey(r.getCoreName())) // Only the elements that were asked for...
- .forEach(r -> replicas.putIfAbsent(r.getCoreName(), r)); // ...get added to the map
- } else {
- replicas = ocmh.waitToSeeReplicasInState(collectionName, coresToCreate.keySet());
- }
-
+ Map<String, Replica> replicas = ocmh.waitToSeeReplicasInState(collectionName, coresToCreate.keySet());
for (Map.Entry<String, ShardRequest> e : coresToCreate.entrySet()) {
ShardRequest sreq = e.getValue();
sreq.params.set(CoreAdminParams.CORE_NODE_NAME, replicas.get(e.getKey()).getName());
@@ -342,23 +300,6 @@ public class CreateCollectionCmd implements OverseerCollectionMessageHandler.Cmd
shardRequestTracker.processResponses(results, shardHandler, false, null, Collections.emptySet());
@SuppressWarnings({"rawtypes"})
boolean failure = results.get("failure") != null && ((SimpleOrderedMap)results.get("failure")).size() > 0;
- if (isPRS) {
- TimeOut timeout = new TimeOut(Integer.getInteger("solr.waitToSeeReplicasInStateTimeoutSeconds", 120), TimeUnit.SECONDS, timeSource); // could be a big cluster
- PerReplicaStates prs = PerReplicaStates.fetch(collectionPath, ocmh.zkStateReader.getZkClient(), null);
- while (!timeout.hasTimedOut()) {
- if(prs.allActive()) break;
- Thread.sleep(100);
- prs = PerReplicaStates.fetch(collectionPath, ocmh.zkStateReader.getZkClient(), null);
- }
- if (prs.allActive()) {
- // we have successfully found all replicas to be ACTIVE
- } else {
- failure = true;
- }
- // Now ask Overseer to fetch the latest state of collection
- // from ZK
- ocmh.overseer.submit(new RefreshCollectionMessage(collectionName));
- }
if (failure) {
// Let's cleanup as we hit an exception
// We shouldn't be passing 'results' here for the cleanup as the response would then contain 'success'
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java b/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java
index 975f654..af4bb43 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStates.java
@@ -27,7 +27,6 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import org.apache.solr.cluster.api.SimpleMap;
@@ -68,8 +67,6 @@ public class PerReplicaStates implements ReflectMapWriter {
@JsonProperty
public final SimpleMap<State> states;
- private Boolean allActive;
-
/**
* Construct with data read from ZK
* @param path path from where this is loaded
@@ -95,17 +92,6 @@ public class PerReplicaStates implements ReflectMapWriter {
}
- /** Check and return if all replicas are ACTIVE
- */
- public boolean allActive() {
- if (this.allActive != null) return allActive;
- AtomicBoolean result = new AtomicBoolean(true);
- states.forEachEntry((r, s) -> {
- if (s.state != Replica.State.ACTIVE) result.set(false);
- });
- return this.allActive = result.get();
- }
-
/**Get the changed replicas
*/
public static Set<String> findModifiedReplicas(PerReplicaStates old, PerReplicaStates fresh) {
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStatesOps.java b/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStatesOps.java
index 3d3a184..1bf0ecc 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStatesOps.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/PerReplicaStatesOps.java
@@ -117,7 +117,7 @@ public class PerReplicaStatesOps {
public static PerReplicaStatesOps flipState(String replica, Replica.State newState, PerReplicaStates rs) {
return new PerReplicaStatesOps(prs -> {
List<PerReplicaStates.Operation> operations = new ArrayList<>(2);
- PerReplicaStates.State existing = prs.get(replica);
+ PerReplicaStates.State existing = rs.get(replica);
if (existing == null) {
operations.add(new PerReplicaStates.Operation(PerReplicaStates.Operation.Type.ADD, new PerReplicaStates.State(replica, newState, Boolean.FALSE, 0)));
} else {
@@ -125,7 +125,7 @@ public class PerReplicaStatesOps {
addDeleteStaleNodes(operations, existing);
}
if (log.isDebugEnabled()) {
- log.debug("flipState on {}, {} -> {}, ops :{}", prs.path, replica, newState, operations);
+ log.debug("flipState on {}, {} -> {}, ops :{}", rs.path, replica, newState, operations);
}
return operations;
}).init(rs);
@@ -161,7 +161,7 @@ public class PerReplicaStatesOps {
return new PerReplicaStatesOps(prs -> {
List<PerReplicaStates.Operation> ops = new ArrayList<>();
if (next != null) {
- PerReplicaStates.State st = prs.get(next);
+ PerReplicaStates.State st = rs.get(next);
if (st != null) {
if (!st.isLeader) {
ops.add(new PerReplicaStates.Operation(PerReplicaStates.Operation.Type.ADD, new PerReplicaStates.State(st.replica, Replica.State.ACTIVE, Boolean.TRUE, st.version + 1)));
@@ -177,7 +177,7 @@ public class PerReplicaStatesOps {
// now go through all other replicas and unset previous leader
for (String r : allReplicas) {
- PerReplicaStates.State st = prs.get(r);
+ PerReplicaStates.State st = rs.get(r);
if (st == null) continue;//unlikely
if (!Objects.equals(r, next)) {
if (st.isLeader) {
@@ -188,7 +188,7 @@ public class PerReplicaStatesOps {
}
}
if (log.isDebugEnabled()) {
- log.debug("flipLeader on:{}, {} -> {}, ops: {}", prs.path, allReplicas, next, ops);
+ log.debug("flipLeader on:{}, {} -> {}, ops: {}", rs.path, allReplicas, next, ops);
}
return ops;
}).init(rs);
@@ -202,10 +202,10 @@ public class PerReplicaStatesOps {
public static PerReplicaStatesOps deleteReplica(String replica, PerReplicaStates rs) {
return new PerReplicaStatesOps(prs -> {
List<PerReplicaStates.Operation> result;
- if (prs == null) {
+ if (rs == null) {
result = Collections.emptyList();
} else {
- PerReplicaStates.State state = prs.get(replica);
+ PerReplicaStates.State state = rs.get(replica);
result = addDeleteStaleNodes(new ArrayList<>(), state);
}
return result;
@@ -224,7 +224,7 @@ public class PerReplicaStatesOps {
return new PerReplicaStatesOps(prs -> {
List<PerReplicaStates.Operation> operations = new ArrayList<>();
for (String replica : replicas) {
- PerReplicaStates.State r = prs.get(replica);
+ PerReplicaStates.State r = rs.get(replica);
if (r != null) {
if (r.state == Replica.State.DOWN && !r.isLeader) continue;
operations.add(new PerReplicaStates.Operation(PerReplicaStates.Operation.Type.ADD, new PerReplicaStates.State(replica, Replica.State.DOWN, Boolean.FALSE, r.version + 1)));
@@ -234,7 +234,7 @@ public class PerReplicaStatesOps {
}
}
if (log.isDebugEnabled()) {
- log.debug("for coll: {} down replicas {}, ops {}", prs, replicas, operations);
+ log.debug("for coll: {} down replicas {}, ops {}", rs, replicas, operations);
}
return operations;
}).init(rs);
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java
index b85499d..171785c 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java
@@ -1099,14 +1099,6 @@ public class CloudSolrClientTest extends SolrCloudTestCase {
c.forEachReplica((s, replica) -> assertNotNull(replica.getReplicaState()));
PerReplicaStates prs = PerReplicaStates.fetch(ZkStateReader.getCollectionPath(testCollection), cluster.getZkClient(), null);
assertEquals(4, prs.states.size());
-
- // Now let's do an add replica
- CollectionAdminRequest
- .addReplicaToShard(testCollection, "shard1")
- .process(cluster.getSolrClient());
- prs = PerReplicaStates.fetch(ZkStateReader.getCollectionPath(testCollection), cluster.getZkClient(), null);
- assertEquals(5, prs.states.size());
-
testCollection = "perReplicaState_testv2";
new V2Request.Builder("/collections")
.withMethod(POST)