You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by yo...@apache.org on 2019/10/11 19:30:50 UTC

[lucene-solr] branch branch_8x updated: SOLR-13815: fix live split data loss due to cluster state change between checking current shard state and getting list of subShards (#920)

This is an automated email from the ASF dual-hosted git repository.

yonik pushed a commit to branch branch_8x
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/branch_8x by this push:
     new cc62b9f  SOLR-13815: fix live split data loss due to cluster state change between checking current shard state and getting list of subShards (#920)
cc62b9f is described below

commit cc62b9fac2302b8db627490efb88482ff6bbde54
Author: Yonik Seeley <yo...@apache.org>
AuthorDate: Fri Oct 11 15:07:03 2019 -0400

    SOLR-13815: fix live split data loss due to cluster state change between checking current shard state and getting list of subShards (#920)
    
    * SOLR-13815: add simple live split test to help debugging possible issue
    
    * SOLR-13815: fix live split data loss due to cluster state change berween checking current shard state and getting list of subShards
---
 solr/CHANGES.txt                                   |   4 +
 .../processor/DistributedZkUpdateProcessor.java    |  46 ++++---
 .../test/org/apache/solr/cloud/SplitShardTest.java | 138 +++++++++++++++++++++
 3 files changed, 174 insertions(+), 14 deletions(-)

diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index ac049a4..8f11d5d 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -252,6 +252,10 @@ Bug Fixes
 
 * SOLR-13829: RecursiveEvaluator casts Continuous numbers to Discrete Numbers, causing mismatch (Trey Grainger, Joel Bernstein)
 
+* SOLR-13815: Live shard split (where updates actively continue during the split) can lose updates due to cluster
+  state happening to change between checking if the current shard is active and later checking if there are any
+  sub-shard leaders to forward the update to. (yonik)
+
 Other Changes
 ----------------------
 
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
index 22e6956..a76b6be 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
@@ -83,6 +83,16 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
   private final String collection;
   private boolean readOnlyCollection = false;
 
+  // The cached immutable clusterState for the update... usually refreshed for each individual update.
+  // Different parts of this class used to request current clusterState views, which lead to subtle bugs and race conditions
+  // such as SOLR-13815 (live split data loss.)  Most likely, the only valid reasons for updating clusterState should be on
+  // certain types of failure + retry.
+  // Note: there may be other races related to
+  //   1) cluster topology change across multiple adds
+  //   2) use of methods directly on zkController that use a different clusterState
+  //   3) in general, not controlling carefully enough exactly when our view of clusterState is updated
+  protected ClusterState clusterState;
+
   // should we clone the document before sending it to replicas?
   // this is set to true in the constructor if the next processors in the chain
   // are custom and may modify the SolrInputDocument racing with its serialization for replication
@@ -103,7 +113,8 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
     cmdDistrib = new SolrCmdDistributor(cc.getUpdateShardHandler());
     cloneRequiredOnLeader = isCloneRequiredOnLeader(next);
     collection = cloudDesc.getCollectionName();
-    DocCollection coll = zkController.getClusterState().getCollectionOrNull(collection);
+    clusterState = zkController.getClusterState();
+    DocCollection coll = clusterState.getCollectionOrNull(collection);
     if (coll != null) {
       // check readOnly property in coll state
       readOnlyCollection = coll.isReadOnly();
@@ -138,6 +149,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
 
   @Override
   public void processCommit(CommitUpdateCommand cmd) throws IOException {
+    clusterState = zkController.getClusterState();
 
     assert TestInjection.injectFailUpdateRequests();
 
@@ -216,6 +228,8 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
 
   @Override
   public void processAdd(AddUpdateCommand cmd) throws IOException {
+    clusterState = zkController.getClusterState();
+
     assert TestInjection.injectFailUpdateRequests();
 
     if (isReadOnly()) {
@@ -235,7 +249,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
   protected void doDistribAdd(AddUpdateCommand cmd) throws IOException {
 
     if (isLeader && !isSubShardLeader)  {
-      DocCollection coll = zkController.getClusterState().getCollection(collection);
+      DocCollection coll = clusterState.getCollection(collection);
       List<SolrCmdDistributor.Node> subShardLeaders = getSubShardLeaders(coll, cloudDesc.getShardId(), cmd.getRootIdUsingRouteParam(), cmd.getSolrInputDocument());
       // the list<node> will actually have only one element for an add request
       if (subShardLeaders != null && !subShardLeaders.isEmpty()) {
@@ -246,7 +260,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
         params.set(DISTRIB_FROM_PARENT, cloudDesc.getShardId());
         cmdDistrib.distribAdd(cmd, subShardLeaders, params, true);
       }
-      final List<SolrCmdDistributor.Node> nodesByRoutingRules = getNodesByRoutingRules(zkController.getClusterState(), coll, cmd.getRootIdUsingRouteParam(), cmd.getSolrInputDocument());
+      final List<SolrCmdDistributor.Node> nodesByRoutingRules = getNodesByRoutingRules(clusterState, coll, cmd.getRootIdUsingRouteParam(), cmd.getSolrInputDocument());
       if (nodesByRoutingRules != null && !nodesByRoutingRules.isEmpty())  {
         ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams()));
         params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString());
@@ -290,6 +304,8 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
 
   @Override
   public void processDelete(DeleteUpdateCommand cmd) throws IOException {
+    clusterState = zkController.getClusterState();
+
     if (isReadOnly()) {
       throw new SolrException(ErrorCode.FORBIDDEN, "Collection " + collection + " is read-only.");
     }
@@ -311,7 +327,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
   @Override
   protected void doDistribDeleteById(DeleteUpdateCommand cmd) throws IOException {
     if (isLeader && !isSubShardLeader)  {
-      DocCollection coll = zkController.getClusterState().getCollection(collection);
+      DocCollection coll = clusterState.getCollection(collection);
       List<SolrCmdDistributor.Node> subShardLeaders = getSubShardLeaders(coll, cloudDesc.getShardId(), cmd.getId(), null);
       // the list<node> will actually have only one element for an add request
       if (subShardLeaders != null && !subShardLeaders.isEmpty()) {
@@ -323,7 +339,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
         cmdDistrib.distribDelete(cmd, subShardLeaders, params, true, null, null);
       }
 
-      final List<SolrCmdDistributor.Node> nodesByRoutingRules = getNodesByRoutingRules(zkController.getClusterState(), coll, cmd.getId(), null);
+      final List<SolrCmdDistributor.Node> nodesByRoutingRules = getNodesByRoutingRules(clusterState, coll, cmd.getId(), null);
       if (nodesByRoutingRules != null && !nodesByRoutingRules.isEmpty())  {
         ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams()));
         params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString());
@@ -366,7 +382,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
     //       - log + execute the local DBQ
     DistribPhase phase = DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM));
 
-    DocCollection coll = zkController.getClusterState().getCollection(collection);
+    DocCollection coll = clusterState.getCollection(collection);
 
     if (DistribPhase.NONE == phase) {
       if (rollupReplicationTracker == null) {
@@ -485,7 +501,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
       if (subShardLeaders != null) {
         cmdDistrib.distribDelete(cmd, subShardLeaders, params, true, rollupReplicationTracker, leaderReplicationTracker);
       }
-      final List<SolrCmdDistributor.Node> nodesByRoutingRules = getNodesByRoutingRules(zkController.getClusterState(), coll, null, null);
+      final List<SolrCmdDistributor.Node> nodesByRoutingRules = getNodesByRoutingRules(clusterState, coll, null, null);
       if (nodesByRoutingRules != null && !nodesByRoutingRules.isEmpty()) {
         params = new ModifiableSolrParams(filterParams(req.getParams()));
         params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString());
@@ -588,8 +604,8 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
       return null;
     }
 
-    ClusterState cstate = zkController.getClusterState();
-    DocCollection coll = cstate.getCollection(collection);
+    clusterState = zkController.getClusterState();
+    DocCollection coll = clusterState.getCollection(collection);
     Slice slice = coll.getRouter().getTargetSlice(id, doc, route, req.getParams(), coll);
 
     if (slice == null) {
@@ -650,7 +666,6 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
         // that means I want to forward onto my replicas...
         // so get the replicas...
         forwardToLeader = false;
-        ClusterState clusterState = zkController.getZkStateReader().getClusterState();
         String leaderCoreNodeName = leaderReplica.getName();
         List<Replica> replicas = clusterState.getCollection(collection)
             .getSlice(shardId)
@@ -733,7 +748,6 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
 
 
   private List<SolrCmdDistributor.Node> getCollectionUrls(String collection, EnumSet<Replica.Type> types, boolean onlyLeaders) {
-    ClusterState clusterState = zkController.getClusterState();
     final DocCollection docCollection = clusterState.getCollectionOrNull(collection);
     if (collection == null || docCollection.getSlicesMap() == null) {
       throw new ZooKeeperException(SolrException.ErrorCode.BAD_REQUEST,
@@ -804,7 +818,6 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
   }
 
   protected List<SolrCmdDistributor.Node> getReplicaNodesForLeader(String shardId, Replica leaderReplica) {
-    ClusterState clusterState = zkController.getZkStateReader().getClusterState();
     String leaderCoreNodeName = leaderReplica.getName();
     List<Replica> replicas = clusterState.getCollection(collection)
         .getSlice(shardId)
@@ -858,7 +871,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
                 || coll.getRouter().isTargetSlice(docId, doc, req.getParams(), aslice.getName(), coll))) {
           Replica sliceLeader = aslice.getLeader();
           // slice leader can be null because node/shard is created zk before leader election
-          if (sliceLeader != null && zkController.getClusterState().liveNodesContain(sliceLeader.getNodeName()))  {
+          if (sliceLeader != null && clusterState.liveNodesContain(sliceLeader.getNodeName()))  {
             if (nodes == null) nodes = new ArrayList<>();
             ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(sliceLeader);
             nodes.add(new SolrCmdDistributor.StdNode(nodeProps, coll.getName(), aslice.getName()));
@@ -955,7 +968,6 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
     if (isReplayOrPeersync) return;
 
     String from = req.getParams().get(DISTRIB_FROM);
-    ClusterState clusterState = zkController.getClusterState();
 
     DocCollection docCollection = clusterState.getCollection(collection);
     Slice mySlice = docCollection.getSlice(cloudDesc.getShardId());
@@ -1015,6 +1027,8 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
 
   @Override
   public void processMergeIndexes(MergeIndexesCommand cmd) throws IOException {
+    clusterState = zkController.getClusterState();
+
     if (isReadOnly()) {
       throw new SolrException(ErrorCode.FORBIDDEN, "Collection " + collection + " is read-only.");
     }
@@ -1023,6 +1037,8 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
 
   @Override
   public void processRollback(RollbackUpdateCommand cmd) throws IOException {
+    clusterState = zkController.getClusterState();
+
     if (isReadOnly()) {
       throw new SolrException(ErrorCode.FORBIDDEN, "Collection " + collection + " is read-only.");
     }
@@ -1031,6 +1047,8 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
 
   @Override
   public void finish() throws IOException {
+    clusterState = zkController.getClusterState();
+
     assertNotFinished();
 
     doFinish();
diff --git a/solr/core/src/test/org/apache/solr/cloud/SplitShardTest.java b/solr/core/src/test/org/apache/solr/cloud/SplitShardTest.java
index 9d4b74c..71ff72c 100644
--- a/solr/core/src/test/org/apache/solr/cloud/SplitShardTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/SplitShardTest.java
@@ -18,19 +18,32 @@
 package org.apache.solr.cloud;
 
 import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrQuery;
 import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.UpdateResponse;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class SplitShardTest extends SolrCloudTestCase {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   private final String COLLECTION_NAME = "splitshardtest-collection";
 
@@ -133,4 +146,129 @@ public class SplitShardTest extends SolrCloudTestCase {
     assertEquals("wrong range in s1_1", expected1, delta1);
   }
 
+
+  CloudSolrClient createCollection(String collectionName, int repFactor) throws Exception {
+
+      CollectionAdminRequest
+          .createCollection(collectionName, "conf", 1, repFactor)
+          .setMaxShardsPerNode(100)
+          .process(cluster.getSolrClient());
+
+    cluster.waitForActiveCollection(collectionName, 1, repFactor);
+
+    CloudSolrClient client = cluster.getSolrClient();
+    client.setDefaultCollection(collectionName);
+    return client;
+  }
+
+
+  long getNumDocs(CloudSolrClient client) throws Exception {
+    String collectionName = client.getDefaultCollection();
+    DocCollection collection = client.getZkStateReader().getClusterState().getCollection(collectionName);
+    Collection<Slice> slices = collection.getSlices();
+
+    long totCount = 0;
+    for (Slice slice : slices) {
+      if (!slice.getState().equals(Slice.State.ACTIVE)) continue;
+      long lastReplicaCount = -1;
+      for (Replica replica : slice.getReplicas()) {
+        SolrClient replicaClient = getHttpSolrClient(replica.getBaseUrl() + "/" + replica.getCoreName());
+        long numFound = 0;
+        try {
+          numFound = replicaClient.query(params("q", "*:*", "distrib", "false")).getResults().getNumFound();
+          log.info("Replica count=" + numFound + " for " + replica);
+        } finally {
+          replicaClient.close();
+        }
+        if (lastReplicaCount >= 0) {
+          assertEquals("Replica doc count for " + replica, lastReplicaCount, numFound);
+        }
+        lastReplicaCount = numFound;
+      }
+      totCount += lastReplicaCount;
+    }
+
+
+    long cloudClientDocs = client.query(new SolrQuery("*:*")).getResults().getNumFound();
+    assertEquals("Sum of shard count should equal distrib query doc count", totCount, cloudClientDocs);
+    return totCount;
+  }
+
+
+  void doLiveSplitShard(String collectionName, int repFactor) throws Exception {
+    final CloudSolrClient client = createCollection(collectionName, repFactor);
+
+    final AtomicBoolean doIndex = new AtomicBoolean(true);
+    final AtomicInteger docsIndexed = new AtomicInteger();
+    Thread indexThread = null;
+    try {
+      // start indexing client before we initiate a shard split
+      indexThread = new Thread(() -> {
+        while (doIndex.get()) {
+          try {
+            // Thread.sleep(10);  // uncomment this to cap indexing rate at 100 docs per second...
+            int currDoc = docsIndexed.get();
+
+            // Try all docs in the same update request
+            UpdateRequest updateReq = new UpdateRequest();
+            updateReq.add(sdoc("id", "doc_" + currDoc));
+            UpdateResponse ursp = updateReq.commit(client, collectionName);
+            assertEquals(0, ursp.getStatus());  // for now, don't accept any failures
+            if (ursp.getStatus() == 0) {
+              docsIndexed.incrementAndGet();
+            }
+          } catch (Exception e) {
+            fail(e.getMessage());
+            break;
+          }
+        }
+      });
+      indexThread.start();
+
+      Thread.sleep(100);  // wait for a few docs to be indexed before invoking split
+      int docCount = docsIndexed.get();
+
+      CollectionAdminRequest.SplitShard splitShard = CollectionAdminRequest.splitShard(collectionName)
+          .setShardName("shard1");
+      splitShard.process(client);
+      waitForState("Timed out waiting for sub shards to be active.",
+          collectionName, activeClusterShape(2, 3*repFactor));  // 2 repFactor for the new split shards, 1 repFactor for old replicas
+
+      // make sure that docs were able to be indexed during the split
+      assertTrue(docsIndexed.get() > docCount);
+
+      Thread.sleep(100);  // wait for a few more docs to be indexed after split
+
+    } finally {
+      // shut down the indexer
+      doIndex.set(false);
+      if (indexThread != null) {
+        indexThread.join();
+      }
+    }
+
+    assertTrue(docsIndexed.get() > 0);
+
+    long numDocs = getNumDocs(client);
+    if (numDocs != docsIndexed.get()) {
+      // Find out what docs are missing.
+      for (int i = 0; i < docsIndexed.get(); i++) {
+        String id = "doc_" + i;
+        long cloudClientDocs = client.query(new SolrQuery("id:" + id)).getResults().getNumFound();
+        if (cloudClientDocs != 1) {
+          log.error("MISSING DOCUMENT " + id);
+        }
+      }
+    }
+
+    assertEquals("Documents are missing!", docsIndexed.get(), numDocs);
+    log.info("Number of documents indexed and queried : " + numDocs);
+  }
+
+  @Test
+  public void testLiveSplit() throws Exception {
+    doLiveSplitShard("livesplit1", 1);
+  }
+
+
 }