You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by no...@apache.org on 2023/07/13 08:33:51 UTC

[solr] branch main updated: SOLR-16871: Race condition in `CoordinatorHttpSolrCall` synthetic collection/replica init (#1762)

This is an automated email from the ASF dual-hosted git repository.

noble pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr.git


The following commit(s) were added to refs/heads/main by this push:
     new fa024e8cbcd SOLR-16871: Race condition in `CoordinatorHttpSolrCall` synthetic collection/replica init (#1762)
fa024e8cbcd is described below

commit fa024e8cbcde73aba91c34b7aa47bbed795d8b79
Author: patsonluk <pa...@users.noreply.github.com>
AuthorDate: Thu Jul 13 01:33:44 2023 -0700

    SOLR-16871: Race condition in `CoordinatorHttpSolrCall` synthetic collection/replica init (#1762)
---
 .../solr/servlet/CoordinatorHttpSolrCall.java      |  77 ++++++++++++++-
 .../apache/solr/search/TestCoordinatorRole.java    | 108 ++++++++++++++++++++-
 2 files changed, 178 insertions(+), 7 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/servlet/CoordinatorHttpSolrCall.java b/solr/core/src/java/org/apache/solr/servlet/CoordinatorHttpSolrCall.java
index 169a2a02032..cef7270d943 100644
--- a/solr/core/src/java/org/apache/solr/servlet/CoordinatorHttpSolrCall.java
+++ b/solr/core/src/java/org/apache/solr/servlet/CoordinatorHttpSolrCall.java
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 import org.apache.solr.api.CoordinatorV2HttpSolrCall;
@@ -87,7 +88,7 @@ public class CoordinatorHttpSolrCall extends HttpSolrCall {
       SolrCore core = null;
       if (coll != null) {
         String confName = coll.getConfigName();
-        String syntheticCollectionName = SYNTHETIC_COLL_PREFIX + confName;
+        String syntheticCollectionName = getSyntheticCollectionName(confName);
 
         DocCollection syntheticColl = clusterState.getCollectionOrNull(syntheticCollectionName);
         if (syntheticColl == null) {
@@ -96,9 +97,32 @@ public class CoordinatorHttpSolrCall extends HttpSolrCall {
             log.info(
                 "synthetic collection: {} does not exist, creating.. ", syntheticCollectionName);
           }
-          createColl(syntheticCollectionName, solrCall.cores, confName);
-          syntheticColl =
-              zkStateReader.getClusterState().getCollectionOrNull(syntheticCollectionName);
+
+          SolrException createException = null;
+          try {
+            createColl(syntheticCollectionName, solrCall.cores, confName);
+          } catch (SolrException exception) {
+            // concurrent requests could have created the collection hence causing collection exists
+            // exception
+            createException = exception;
+          } finally {
+            syntheticColl =
+                zkStateReader.getClusterState().getCollectionOrNull(syntheticCollectionName);
+          }
+
+          // then indeed the collection was not created properly, either by this or other concurrent
+          // requests
+          if (syntheticColl == null) {
+            if (createException != null) {
+              throw createException; // rethrow the exception since such collection was not created
+            } else {
+              throw new SolrException(
+                  SolrException.ErrorCode.SERVER_ERROR,
+                  "Could not locate synthetic collection ["
+                      + syntheticCollectionName
+                      + "] after creation!");
+            }
+          }
         }
         List<Replica> nodeNameSyntheticReplicas =
             syntheticColl.getReplicas(solrCall.cores.getZkController().getNodeName());
@@ -112,6 +136,32 @@ public class CoordinatorHttpSolrCall extends HttpSolrCall {
 
           addReplica(syntheticCollectionName, solrCall.cores);
         }
+        // still have to ensure that it's active, otherwise super.getCoreByCollection
+        // will return null and then CoordinatorHttpSolrCall will call getCore again
+        // hence creating a calling loop
+        try {
+          zkStateReader.waitForState(
+              syntheticCollectionName,
+              10,
+              TimeUnit.SECONDS,
+              docCollection -> {
+                for (Replica nodeNameSyntheticReplica :
+                    docCollection.getReplicas(solrCall.cores.getZkController().getNodeName())) {
+                  if (nodeNameSyntheticReplica.getState() == Replica.State.ACTIVE) {
+                    return true;
+                  }
+                }
+                return false;
+              });
+        } catch (Exception e) {
+          throw new SolrException(
+              SolrException.ErrorCode.SERVER_ERROR,
+              "Failed to wait for active replica for synthetic collection ["
+                  + syntheticCollectionName
+                  + "]",
+              e);
+        }
+
         core = solrCall.getCoreByCollection(syntheticCollectionName, isPreferLeader);
         if (core != null) {
           factory.collectionVsCoreNameMapping.put(collectionName, core.getName());
@@ -142,6 +192,10 @@ public class CoordinatorHttpSolrCall extends HttpSolrCall {
     }
   }
 
+  public static String getSyntheticCollectionName(String configName) {
+    return SYNTHETIC_COLL_PREFIX + configName;
+  }
+
   /**
    * Overrides the MDC context as the core set was synthetic core, which does not reflect the
    * collection being operated on
@@ -158,9 +212,13 @@ public class CoordinatorHttpSolrCall extends HttpSolrCall {
   private static void addReplica(String syntheticCollectionName, CoreContainer cores) {
     SolrQueryResponse rsp = new SolrQueryResponse();
     try {
+      String coreName =
+          syntheticCollectionName + "_" + cores.getZkController().getNodeName().replace(':', '_');
       CollectionAdminRequest.AddReplica addReplicaRequest =
           CollectionAdminRequest.addReplicaToShard(syntheticCollectionName, "shard1")
-              .setCreateNodeSet(cores.getZkController().getNodeName());
+              // we are fixing the name, so that no two replicas are created in the same node
+              .setCoreName(coreName)
+              .setNode(cores.getZkController().getNodeName());
       addReplicaRequest.setWaitForFinalState(true);
       cores
           .getCollectionsHandler()
@@ -171,6 +229,15 @@ public class CoordinatorHttpSolrCall extends HttpSolrCall {
             "Could not auto-create collection: " + Utils.toJSONString(rsp.getValues()));
       }
     } catch (SolrException e) {
+      if (e.getMessage().contains("replica with the same core name already exists")) {
+        // another request has already created a replica for this synthetic collection
+        if (log.isInfoEnabled()) {
+          log.info(
+              "A replica is already created in this node for synthetic collection: {}",
+              syntheticCollectionName);
+        }
+        return;
+      }
       throw e;
 
     } catch (Exception e) {
diff --git a/solr/core/src/test/org/apache/solr/search/TestCoordinatorRole.java b/solr/core/src/test/org/apache/solr/search/TestCoordinatorRole.java
index a0b1d1bef99..538c6b44703 100644
--- a/solr/core/src/test/org/apache/solr/search/TestCoordinatorRole.java
+++ b/solr/core/src/test/org/apache/solr/search/TestCoordinatorRole.java
@@ -21,6 +21,7 @@ import static org.apache.solr.common.params.CommonParams.OMIT_HEADER;
 import static org.apache.solr.common.params.CommonParams.TRUE;
 
 import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
 import java.util.Date;
 import java.util.EnumSet;
 import java.util.HashSet;
@@ -33,6 +34,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
+import java.util.stream.Collectors;
 import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.SolrQuery;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
@@ -43,6 +45,7 @@ import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.client.solrj.response.QueryResponse;
 import org.apache.solr.cloud.MiniSolrCloudCluster;
 import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.SolrDocument;
 import org.apache.solr.common.SolrDocumentList;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrInputDocument;
@@ -69,7 +72,7 @@ public class TestCoordinatorRole extends SolrCloudTestCase {
     try {
       CloudSolrClient client = cluster.getSolrClient();
       String COLLECTION_NAME = "test_coll";
-      String SYNTHETIC_COLLECTION = CoordinatorHttpSolrCall.SYNTHETIC_COLL_PREFIX + "conf";
+      String SYNTHETIC_COLLECTION = CoordinatorHttpSolrCall.getSyntheticCollectionName("conf");
       CollectionAdminRequest.createCollection(COLLECTION_NAME, "conf", 2, 2)
           .process(cluster.getSolrClient());
       cluster.waitForActiveCollection(COLLECTION_NAME, 2, 4);
@@ -117,7 +120,7 @@ public class TestCoordinatorRole extends SolrCloudTestCase {
     try {
       CloudSolrClient client = cluster.getSolrClient();
       String COLLECTION_NAME = "test_coll";
-      String SYNTHETIC_COLLECTION = CoordinatorHttpSolrCall.SYNTHETIC_COLL_PREFIX + "conf";
+      String SYNTHETIC_COLLECTION = CoordinatorHttpSolrCall.getSyntheticCollectionName("conf");
       for (int j = 1; j <= 10; j++) {
         String collname = COLLECTION_NAME + "_" + j;
         CollectionAdminRequest.createCollection(collname, "conf", 2, 2)
@@ -481,6 +484,107 @@ public class TestCoordinatorRole extends SolrCloudTestCase {
     return (String) docs.get(0).getFieldValue("_core_");
   }
 
+  public void testConcurrentAccess() throws Exception {
+    final int DATA_NODE_COUNT = 2;
+    final int COORDINATOR_NODE_COUNT = 4;
+    MiniSolrCloudCluster cluster =
+        configureCluster(DATA_NODE_COUNT).addConfig("conf", configset("cloud-minimal")).configure();
+
+    List<String> dataNodes =
+        cluster.getJettySolrRunners().stream()
+            .map(JettySolrRunner::getNodeName)
+            .collect(Collectors.toUnmodifiableList());
+
+    try {
+      CloudSolrClient client = cluster.getSolrClient();
+      String COLLECTION_PREFIX = "test_coll_";
+
+      final int COLLECTION_COUNT = 10;
+      final int DOC_PER_COLLECTION_COUNT = 1000;
+
+      List<String> collectionNames = new ArrayList<>();
+      for (int i = 0; i < COLLECTION_COUNT; i++) {
+        String collectionName = COLLECTION_PREFIX + i;
+        CollectionAdminRequest.createCollection(collectionName, "conf", 2, 1)
+            .setCreateNodeSet(String.join(",", dataNodes)) // only put data onto the 2 data nodes
+            .process(cluster.getSolrClient());
+        cluster.waitForActiveCollection(collectionName, 2, 2);
+        collectionNames.add(collectionName);
+      }
+
+      for (String collectionName : collectionNames) {
+        UpdateRequest ur = new UpdateRequest();
+        for (int i = 0; i < DOC_PER_COLLECTION_COUNT; i++) {
+          SolrInputDocument doc2 = new SolrInputDocument();
+          doc2.addField("id", collectionName + "-" + i);
+          ur.add(doc2);
+        }
+        ur.commit(client, collectionName);
+        QueryResponse rsp = client.query(collectionName, new SolrQuery("*:*"));
+        assertEquals(DOC_PER_COLLECTION_COUNT, rsp.getResults().getNumFound());
+      }
+
+      System.setProperty(NodeRoles.NODE_ROLES_PROP, "coordinator:on");
+      List<String> coordinatorNodes = new ArrayList<>();
+      try {
+        for (int i = 0; i < COORDINATOR_NODE_COUNT; i++) {
+          JettySolrRunner coordinatorJetty = cluster.startJettySolrRunner();
+          coordinatorNodes.add(coordinatorJetty.getNodeName());
+        }
+      } finally {
+        System.clearProperty(NodeRoles.NODE_ROLES_PROP);
+      }
+
+      int THREAD_COUNT = 10;
+      int RUN_COUNT = 20;
+      // final AtomicInteger runCounter = new AtomicInteger();
+      // 10 threads to concurrently access the collections and ensure data are not mixed up
+      ExecutorService executorService =
+          ExecutorUtil.newMDCAwareFixedThreadPool(
+              THREAD_COUNT, new SolrNamedThreadFactory(this.getClass().getSimpleName()));
+      List<Future<?>> testFutures = new ArrayList<>();
+
+      for (int i = 0; i < RUN_COUNT; i++) {
+        final int currentRun = i;
+        testFutures.add(
+            executorService.submit(
+                () -> {
+                  final String collectionName =
+                      collectionNames.get(currentRun % collectionNames.size());
+                  final String coordinatorNode =
+                      coordinatorNodes.get(currentRun % coordinatorNodes.size());
+                  QueryResponse response =
+                      new QueryRequest(new SolrQuery("*:*"))
+                          .setPreferredNodes(List.of(coordinatorNode))
+                          .process(client, collectionName);
+                  assertEquals(DOC_PER_COLLECTION_COUNT, response.getResults().getNumFound());
+                  // ensure docs have the correct id (ie not mixing up with other collections)
+                  for (SolrDocument doc : response.getResults()) {
+                    assertTrue(((String) doc.getFieldValue("id")).startsWith(collectionName));
+                  }
+                  return null;
+                }));
+      }
+      for (Future<?> testFuture : testFutures) {
+        testFuture.get(); // check for any exceptions/failures
+      }
+
+      // number of replicas created in the synthetic collection should be one per coordinator node
+      assertEquals(
+          COORDINATOR_NODE_COUNT,
+          client
+              .getClusterState()
+              .getCollection(CoordinatorHttpSolrCall.getSyntheticCollectionName("conf"))
+              .getReplicas()
+              .size());
+
+      executorService.shutdown();
+      executorService.awaitTermination(10, TimeUnit.SECONDS);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
   public void testWatch() throws Exception {
     final int DATA_NODE_COUNT = 2;
     MiniSolrCloudCluster cluster =