You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by no...@apache.org on 2023/07/13 08:33:51 UTC
[solr] branch main updated: SOLR-16871: Race condition in `CoordinatorHttpSolrCall` synthetic collection/replica init (#1762)
This is an automated email from the ASF dual-hosted git repository.
noble pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr.git
The following commit(s) were added to refs/heads/main by this push:
new fa024e8cbcd SOLR-16871: Race condition in `CoordinatorHttpSolrCall` synthetic collection/replica init (#1762)
fa024e8cbcd is described below
commit fa024e8cbcde73aba91c34b7aa47bbed795d8b79
Author: patsonluk <pa...@users.noreply.github.com>
AuthorDate: Thu Jul 13 01:33:44 2023 -0700
SOLR-16871: Race condition in `CoordinatorHttpSolrCall` synthetic collection/replica init (#1762)
---
.../solr/servlet/CoordinatorHttpSolrCall.java | 77 ++++++++++++++-
.../apache/solr/search/TestCoordinatorRole.java | 108 ++++++++++++++++++++-
2 files changed, 178 insertions(+), 7 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/servlet/CoordinatorHttpSolrCall.java b/solr/core/src/java/org/apache/solr/servlet/CoordinatorHttpSolrCall.java
index 169a2a02032..cef7270d943 100644
--- a/solr/core/src/java/org/apache/solr/servlet/CoordinatorHttpSolrCall.java
+++ b/solr/core/src/java/org/apache/solr/servlet/CoordinatorHttpSolrCall.java
@@ -22,6 +22,7 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.solr.api.CoordinatorV2HttpSolrCall;
@@ -87,7 +88,7 @@ public class CoordinatorHttpSolrCall extends HttpSolrCall {
SolrCore core = null;
if (coll != null) {
String confName = coll.getConfigName();
- String syntheticCollectionName = SYNTHETIC_COLL_PREFIX + confName;
+ String syntheticCollectionName = getSyntheticCollectionName(confName);
DocCollection syntheticColl = clusterState.getCollectionOrNull(syntheticCollectionName);
if (syntheticColl == null) {
@@ -96,9 +97,32 @@ public class CoordinatorHttpSolrCall extends HttpSolrCall {
log.info(
"synthetic collection: {} does not exist, creating.. ", syntheticCollectionName);
}
- createColl(syntheticCollectionName, solrCall.cores, confName);
- syntheticColl =
- zkStateReader.getClusterState().getCollectionOrNull(syntheticCollectionName);
+
+ SolrException createException = null;
+ try {
+ createColl(syntheticCollectionName, solrCall.cores, confName);
+ } catch (SolrException exception) {
+ // concurrent requests could have created the collection hence causing collection exists
+ // exception
+ createException = exception;
+ } finally {
+ syntheticColl =
+ zkStateReader.getClusterState().getCollectionOrNull(syntheticCollectionName);
+ }
+
+ // then indeed the collection was not created properly, either by this or other concurrent
+ // requests
+ if (syntheticColl == null) {
+ if (createException != null) {
+ throw createException; // rethrow the exception since such collection was not created
+ } else {
+ throw new SolrException(
+ SolrException.ErrorCode.SERVER_ERROR,
+ "Could not locate synthetic collection ["
+ + syntheticCollectionName
+ + "] after creation!");
+ }
+ }
}
List<Replica> nodeNameSyntheticReplicas =
syntheticColl.getReplicas(solrCall.cores.getZkController().getNodeName());
@@ -112,6 +136,32 @@ public class CoordinatorHttpSolrCall extends HttpSolrCall {
addReplica(syntheticCollectionName, solrCall.cores);
}
+ // still have to ensure that it's active, otherwise super.getCoreByCollection
+ // will return null and then CoordinatorHttpSolrCall will call getCore again
+ // hence creating a calling loop
+ try {
+ zkStateReader.waitForState(
+ syntheticCollectionName,
+ 10,
+ TimeUnit.SECONDS,
+ docCollection -> {
+ for (Replica nodeNameSyntheticReplica :
+ docCollection.getReplicas(solrCall.cores.getZkController().getNodeName())) {
+ if (nodeNameSyntheticReplica.getState() == Replica.State.ACTIVE) {
+ return true;
+ }
+ }
+ return false;
+ });
+ } catch (Exception e) {
+ throw new SolrException(
+ SolrException.ErrorCode.SERVER_ERROR,
+ "Failed to wait for active replica for synthetic collection ["
+ + syntheticCollectionName
+ + "]",
+ e);
+ }
+
core = solrCall.getCoreByCollection(syntheticCollectionName, isPreferLeader);
if (core != null) {
factory.collectionVsCoreNameMapping.put(collectionName, core.getName());
@@ -142,6 +192,10 @@ public class CoordinatorHttpSolrCall extends HttpSolrCall {
}
}
+ public static String getSyntheticCollectionName(String configName) {
+ return SYNTHETIC_COLL_PREFIX + configName;
+ }
+
/**
* Overrides the MDC context as the core set was synthetic core, which does not reflect the
* collection being operated on
@@ -158,9 +212,13 @@ public class CoordinatorHttpSolrCall extends HttpSolrCall {
private static void addReplica(String syntheticCollectionName, CoreContainer cores) {
SolrQueryResponse rsp = new SolrQueryResponse();
try {
+ String coreName =
+ syntheticCollectionName + "_" + cores.getZkController().getNodeName().replace(':', '_');
CollectionAdminRequest.AddReplica addReplicaRequest =
CollectionAdminRequest.addReplicaToShard(syntheticCollectionName, "shard1")
- .setCreateNodeSet(cores.getZkController().getNodeName());
+ // we are fixing the name, so that no two replicas are created in the same node
+ .setCoreName(coreName)
+ .setNode(cores.getZkController().getNodeName());
addReplicaRequest.setWaitForFinalState(true);
cores
.getCollectionsHandler()
@@ -171,6 +229,15 @@ public class CoordinatorHttpSolrCall extends HttpSolrCall {
"Could not auto-create collection: " + Utils.toJSONString(rsp.getValues()));
}
} catch (SolrException e) {
+ if (e.getMessage().contains("replica with the same core name already exists")) {
+ // another request has already created a replica for this synthetic collection
+ if (log.isInfoEnabled()) {
+ log.info(
+ "A replica is already created in this node for synthetic collection: {}",
+ syntheticCollectionName);
+ }
+ return;
+ }
throw e;
} catch (Exception e) {
diff --git a/solr/core/src/test/org/apache/solr/search/TestCoordinatorRole.java b/solr/core/src/test/org/apache/solr/search/TestCoordinatorRole.java
index a0b1d1bef99..538c6b44703 100644
--- a/solr/core/src/test/org/apache/solr/search/TestCoordinatorRole.java
+++ b/solr/core/src/test/org/apache/solr/search/TestCoordinatorRole.java
@@ -21,6 +21,7 @@ import static org.apache.solr.common.params.CommonParams.OMIT_HEADER;
import static org.apache.solr.common.params.CommonParams.TRUE;
import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
import java.util.Date;
import java.util.EnumSet;
import java.util.HashSet;
@@ -33,6 +34,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
+import java.util.stream.Collectors;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
@@ -43,6 +45,7 @@ import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.cloud.MiniSolrCloudCluster;
import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
@@ -69,7 +72,7 @@ public class TestCoordinatorRole extends SolrCloudTestCase {
try {
CloudSolrClient client = cluster.getSolrClient();
String COLLECTION_NAME = "test_coll";
- String SYNTHETIC_COLLECTION = CoordinatorHttpSolrCall.SYNTHETIC_COLL_PREFIX + "conf";
+ String SYNTHETIC_COLLECTION = CoordinatorHttpSolrCall.getSyntheticCollectionName("conf");
CollectionAdminRequest.createCollection(COLLECTION_NAME, "conf", 2, 2)
.process(cluster.getSolrClient());
cluster.waitForActiveCollection(COLLECTION_NAME, 2, 4);
@@ -117,7 +120,7 @@ public class TestCoordinatorRole extends SolrCloudTestCase {
try {
CloudSolrClient client = cluster.getSolrClient();
String COLLECTION_NAME = "test_coll";
- String SYNTHETIC_COLLECTION = CoordinatorHttpSolrCall.SYNTHETIC_COLL_PREFIX + "conf";
+ String SYNTHETIC_COLLECTION = CoordinatorHttpSolrCall.getSyntheticCollectionName("conf");
for (int j = 1; j <= 10; j++) {
String collname = COLLECTION_NAME + "_" + j;
CollectionAdminRequest.createCollection(collname, "conf", 2, 2)
@@ -481,6 +484,107 @@ public class TestCoordinatorRole extends SolrCloudTestCase {
return (String) docs.get(0).getFieldValue("_core_");
}
+ public void testConcurrentAccess() throws Exception {
+ final int DATA_NODE_COUNT = 2;
+ final int COORDINATOR_NODE_COUNT = 4;
+ MiniSolrCloudCluster cluster =
+ configureCluster(DATA_NODE_COUNT).addConfig("conf", configset("cloud-minimal")).configure();
+
+ List<String> dataNodes =
+ cluster.getJettySolrRunners().stream()
+ .map(JettySolrRunner::getNodeName)
+ .collect(Collectors.toUnmodifiableList());
+
+ try {
+ CloudSolrClient client = cluster.getSolrClient();
+ String COLLECTION_PREFIX = "test_coll_";
+
+ final int COLLECTION_COUNT = 10;
+ final int DOC_PER_COLLECTION_COUNT = 1000;
+
+ List<String> collectionNames = new ArrayList<>();
+ for (int i = 0; i < COLLECTION_COUNT; i++) {
+ String collectionName = COLLECTION_PREFIX + i;
+ CollectionAdminRequest.createCollection(collectionName, "conf", 2, 1)
+ .setCreateNodeSet(String.join(",", dataNodes)) // only put data onto the 2 data nodes
+ .process(cluster.getSolrClient());
+ cluster.waitForActiveCollection(collectionName, 2, 2);
+ collectionNames.add(collectionName);
+ }
+
+ for (String collectionName : collectionNames) {
+ UpdateRequest ur = new UpdateRequest();
+ for (int i = 0; i < DOC_PER_COLLECTION_COUNT; i++) {
+ SolrInputDocument doc2 = new SolrInputDocument();
+ doc2.addField("id", collectionName + "-" + i);
+ ur.add(doc2);
+ }
+ ur.commit(client, collectionName);
+ QueryResponse rsp = client.query(collectionName, new SolrQuery("*:*"));
+ assertEquals(DOC_PER_COLLECTION_COUNT, rsp.getResults().getNumFound());
+ }
+
+ System.setProperty(NodeRoles.NODE_ROLES_PROP, "coordinator:on");
+ List<String> coordinatorNodes = new ArrayList<>();
+ try {
+ for (int i = 0; i < COORDINATOR_NODE_COUNT; i++) {
+ JettySolrRunner coordinatorJetty = cluster.startJettySolrRunner();
+ coordinatorNodes.add(coordinatorJetty.getNodeName());
+ }
+ } finally {
+ System.clearProperty(NodeRoles.NODE_ROLES_PROP);
+ }
+
+ int THREAD_COUNT = 10;
+ int RUN_COUNT = 20;
+ // final AtomicInteger runCounter = new AtomicInteger();
+ // 10 threads to concurrently access the collections and ensure data are not mixed up
+ ExecutorService executorService =
+ ExecutorUtil.newMDCAwareFixedThreadPool(
+ THREAD_COUNT, new SolrNamedThreadFactory(this.getClass().getSimpleName()));
+ List<Future<?>> testFutures = new ArrayList<>();
+
+ for (int i = 0; i < RUN_COUNT; i++) {
+ final int currentRun = i;
+ testFutures.add(
+ executorService.submit(
+ () -> {
+ final String collectionName =
+ collectionNames.get(currentRun % collectionNames.size());
+ final String coordinatorNode =
+ coordinatorNodes.get(currentRun % coordinatorNodes.size());
+ QueryResponse response =
+ new QueryRequest(new SolrQuery("*:*"))
+ .setPreferredNodes(List.of(coordinatorNode))
+ .process(client, collectionName);
+ assertEquals(DOC_PER_COLLECTION_COUNT, response.getResults().getNumFound());
+ // ensure docs have the correct id (ie not mixing up with other collections)
+ for (SolrDocument doc : response.getResults()) {
+ assertTrue(((String) doc.getFieldValue("id")).startsWith(collectionName));
+ }
+ return null;
+ }));
+ }
+ for (Future<?> testFuture : testFutures) {
+ testFuture.get(); // check for any exceptions/failures
+ }
+
+ // number of replicas created in the synthetic collection should be one per coordinator node
+ assertEquals(
+ COORDINATOR_NODE_COUNT,
+ client
+ .getClusterState()
+ .getCollection(CoordinatorHttpSolrCall.getSyntheticCollectionName("conf"))
+ .getReplicas()
+ .size());
+
+ executorService.shutdown();
+ executorService.awaitTermination(10, TimeUnit.SECONDS);
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
public void testWatch() throws Exception {
final int DATA_NODE_COUNT = 2;
MiniSolrCloudCluster cluster =