You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by yo...@apache.org on 2019/10/11 19:31:06 UTC
[lucene-solr] branch branch_8_3 updated: SOLR-13815: fix live split
data loss due to cluster state change between checking current shard state
and getting list of subShards (#920)
This is an automated email from the ASF dual-hosted git repository.
yonik pushed a commit to branch branch_8_3
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/branch_8_3 by this push:
new 503fe7e SOLR-13815: fix live split data loss due to cluster state change between checking current shard state and getting list of subShards (#920)
503fe7e is described below
commit 503fe7e9a9d5e80890fa7fe63c4fd56a161d0619
Author: Yonik Seeley <yo...@apache.org>
AuthorDate: Fri Oct 11 15:07:03 2019 -0400
SOLR-13815: fix live split data loss due to cluster state change between checking current shard state and getting list of subShards (#920)
* SOLR-13815: add simple live split test to help debugging possible issue
* SOLR-13815: fix live split data loss due to cluster state change berween checking current shard state and getting list of subShards
---
solr/CHANGES.txt | 4 +
.../processor/DistributedZkUpdateProcessor.java | 46 ++++---
.../test/org/apache/solr/cloud/SplitShardTest.java | 138 +++++++++++++++++++++
3 files changed, 174 insertions(+), 14 deletions(-)
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index c7598eb..b4a0a83 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -214,6 +214,10 @@ Bug Fixes
* SOLR-13829: RecursiveEvaluator casts Continuous numbers to Discrete Numbers, causing mismatch (Trey Grainger, Joel Bernstein)
+* SOLR-13815: Live shard split (where updates actively continue during the split) can lose updates due to cluster
+ state happening to change between checking if the current shard is active and later checking if there are any
+ sub-shard leaders to forward the update to. (yonik)
+
Other Changes
----------------------
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
index 22e6956..a76b6be 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
@@ -83,6 +83,16 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
private final String collection;
private boolean readOnlyCollection = false;
+ // The cached immutable clusterState for the update... usually refreshed for each individual update.
+ // Different parts of this class used to request current clusterState views, which lead to subtle bugs and race conditions
+ // such as SOLR-13815 (live split data loss.) Most likely, the only valid reasons for updating clusterState should be on
+ // certain types of failure + retry.
+ // Note: there may be other races related to
+ // 1) cluster topology change across multiple adds
+ // 2) use of methods directly on zkController that use a different clusterState
+ // 3) in general, not controlling carefully enough exactly when our view of clusterState is updated
+ protected ClusterState clusterState;
+
// should we clone the document before sending it to replicas?
// this is set to true in the constructor if the next processors in the chain
// are custom and may modify the SolrInputDocument racing with its serialization for replication
@@ -103,7 +113,8 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
cmdDistrib = new SolrCmdDistributor(cc.getUpdateShardHandler());
cloneRequiredOnLeader = isCloneRequiredOnLeader(next);
collection = cloudDesc.getCollectionName();
- DocCollection coll = zkController.getClusterState().getCollectionOrNull(collection);
+ clusterState = zkController.getClusterState();
+ DocCollection coll = clusterState.getCollectionOrNull(collection);
if (coll != null) {
// check readOnly property in coll state
readOnlyCollection = coll.isReadOnly();
@@ -138,6 +149,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
@Override
public void processCommit(CommitUpdateCommand cmd) throws IOException {
+ clusterState = zkController.getClusterState();
assert TestInjection.injectFailUpdateRequests();
@@ -216,6 +228,8 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
@Override
public void processAdd(AddUpdateCommand cmd) throws IOException {
+ clusterState = zkController.getClusterState();
+
assert TestInjection.injectFailUpdateRequests();
if (isReadOnly()) {
@@ -235,7 +249,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
protected void doDistribAdd(AddUpdateCommand cmd) throws IOException {
if (isLeader && !isSubShardLeader) {
- DocCollection coll = zkController.getClusterState().getCollection(collection);
+ DocCollection coll = clusterState.getCollection(collection);
List<SolrCmdDistributor.Node> subShardLeaders = getSubShardLeaders(coll, cloudDesc.getShardId(), cmd.getRootIdUsingRouteParam(), cmd.getSolrInputDocument());
// the list<node> will actually have only one element for an add request
if (subShardLeaders != null && !subShardLeaders.isEmpty()) {
@@ -246,7 +260,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
params.set(DISTRIB_FROM_PARENT, cloudDesc.getShardId());
cmdDistrib.distribAdd(cmd, subShardLeaders, params, true);
}
- final List<SolrCmdDistributor.Node> nodesByRoutingRules = getNodesByRoutingRules(zkController.getClusterState(), coll, cmd.getRootIdUsingRouteParam(), cmd.getSolrInputDocument());
+ final List<SolrCmdDistributor.Node> nodesByRoutingRules = getNodesByRoutingRules(clusterState, coll, cmd.getRootIdUsingRouteParam(), cmd.getSolrInputDocument());
if (nodesByRoutingRules != null && !nodesByRoutingRules.isEmpty()) {
ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams()));
params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString());
@@ -290,6 +304,8 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
@Override
public void processDelete(DeleteUpdateCommand cmd) throws IOException {
+ clusterState = zkController.getClusterState();
+
if (isReadOnly()) {
throw new SolrException(ErrorCode.FORBIDDEN, "Collection " + collection + " is read-only.");
}
@@ -311,7 +327,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
@Override
protected void doDistribDeleteById(DeleteUpdateCommand cmd) throws IOException {
if (isLeader && !isSubShardLeader) {
- DocCollection coll = zkController.getClusterState().getCollection(collection);
+ DocCollection coll = clusterState.getCollection(collection);
List<SolrCmdDistributor.Node> subShardLeaders = getSubShardLeaders(coll, cloudDesc.getShardId(), cmd.getId(), null);
// the list<node> will actually have only one element for an add request
if (subShardLeaders != null && !subShardLeaders.isEmpty()) {
@@ -323,7 +339,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
cmdDistrib.distribDelete(cmd, subShardLeaders, params, true, null, null);
}
- final List<SolrCmdDistributor.Node> nodesByRoutingRules = getNodesByRoutingRules(zkController.getClusterState(), coll, cmd.getId(), null);
+ final List<SolrCmdDistributor.Node> nodesByRoutingRules = getNodesByRoutingRules(clusterState, coll, cmd.getId(), null);
if (nodesByRoutingRules != null && !nodesByRoutingRules.isEmpty()) {
ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams()));
params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString());
@@ -366,7 +382,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
// - log + execute the local DBQ
DistribPhase phase = DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM));
- DocCollection coll = zkController.getClusterState().getCollection(collection);
+ DocCollection coll = clusterState.getCollection(collection);
if (DistribPhase.NONE == phase) {
if (rollupReplicationTracker == null) {
@@ -485,7 +501,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
if (subShardLeaders != null) {
cmdDistrib.distribDelete(cmd, subShardLeaders, params, true, rollupReplicationTracker, leaderReplicationTracker);
}
- final List<SolrCmdDistributor.Node> nodesByRoutingRules = getNodesByRoutingRules(zkController.getClusterState(), coll, null, null);
+ final List<SolrCmdDistributor.Node> nodesByRoutingRules = getNodesByRoutingRules(clusterState, coll, null, null);
if (nodesByRoutingRules != null && !nodesByRoutingRules.isEmpty()) {
params = new ModifiableSolrParams(filterParams(req.getParams()));
params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString());
@@ -588,8 +604,8 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
return null;
}
- ClusterState cstate = zkController.getClusterState();
- DocCollection coll = cstate.getCollection(collection);
+ clusterState = zkController.getClusterState();
+ DocCollection coll = clusterState.getCollection(collection);
Slice slice = coll.getRouter().getTargetSlice(id, doc, route, req.getParams(), coll);
if (slice == null) {
@@ -650,7 +666,6 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
// that means I want to forward onto my replicas...
// so get the replicas...
forwardToLeader = false;
- ClusterState clusterState = zkController.getZkStateReader().getClusterState();
String leaderCoreNodeName = leaderReplica.getName();
List<Replica> replicas = clusterState.getCollection(collection)
.getSlice(shardId)
@@ -733,7 +748,6 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
private List<SolrCmdDistributor.Node> getCollectionUrls(String collection, EnumSet<Replica.Type> types, boolean onlyLeaders) {
- ClusterState clusterState = zkController.getClusterState();
final DocCollection docCollection = clusterState.getCollectionOrNull(collection);
if (collection == null || docCollection.getSlicesMap() == null) {
throw new ZooKeeperException(SolrException.ErrorCode.BAD_REQUEST,
@@ -804,7 +818,6 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
}
protected List<SolrCmdDistributor.Node> getReplicaNodesForLeader(String shardId, Replica leaderReplica) {
- ClusterState clusterState = zkController.getZkStateReader().getClusterState();
String leaderCoreNodeName = leaderReplica.getName();
List<Replica> replicas = clusterState.getCollection(collection)
.getSlice(shardId)
@@ -858,7 +871,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
|| coll.getRouter().isTargetSlice(docId, doc, req.getParams(), aslice.getName(), coll))) {
Replica sliceLeader = aslice.getLeader();
// slice leader can be null because node/shard is created zk before leader election
- if (sliceLeader != null && zkController.getClusterState().liveNodesContain(sliceLeader.getNodeName())) {
+ if (sliceLeader != null && clusterState.liveNodesContain(sliceLeader.getNodeName())) {
if (nodes == null) nodes = new ArrayList<>();
ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(sliceLeader);
nodes.add(new SolrCmdDistributor.StdNode(nodeProps, coll.getName(), aslice.getName()));
@@ -955,7 +968,6 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
if (isReplayOrPeersync) return;
String from = req.getParams().get(DISTRIB_FROM);
- ClusterState clusterState = zkController.getClusterState();
DocCollection docCollection = clusterState.getCollection(collection);
Slice mySlice = docCollection.getSlice(cloudDesc.getShardId());
@@ -1015,6 +1027,8 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
@Override
public void processMergeIndexes(MergeIndexesCommand cmd) throws IOException {
+ clusterState = zkController.getClusterState();
+
if (isReadOnly()) {
throw new SolrException(ErrorCode.FORBIDDEN, "Collection " + collection + " is read-only.");
}
@@ -1023,6 +1037,8 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
@Override
public void processRollback(RollbackUpdateCommand cmd) throws IOException {
+ clusterState = zkController.getClusterState();
+
if (isReadOnly()) {
throw new SolrException(ErrorCode.FORBIDDEN, "Collection " + collection + " is read-only.");
}
@@ -1031,6 +1047,8 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
@Override
public void finish() throws IOException {
+ clusterState = zkController.getClusterState();
+
assertNotFinished();
doFinish();
diff --git a/solr/core/src/test/org/apache/solr/cloud/SplitShardTest.java b/solr/core/src/test/org/apache/solr/cloud/SplitShardTest.java
index 9d4b74c..71ff72c 100644
--- a/solr/core/src/test/org/apache/solr/cloud/SplitShardTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/SplitShardTest.java
@@ -18,19 +18,32 @@
package org.apache.solr.cloud;
import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class SplitShardTest extends SolrCloudTestCase {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final String COLLECTION_NAME = "splitshardtest-collection";
@@ -133,4 +146,129 @@ public class SplitShardTest extends SolrCloudTestCase {
assertEquals("wrong range in s1_1", expected1, delta1);
}
+
+ CloudSolrClient createCollection(String collectionName, int repFactor) throws Exception {
+
+ CollectionAdminRequest
+ .createCollection(collectionName, "conf", 1, repFactor)
+ .setMaxShardsPerNode(100)
+ .process(cluster.getSolrClient());
+
+ cluster.waitForActiveCollection(collectionName, 1, repFactor);
+
+ CloudSolrClient client = cluster.getSolrClient();
+ client.setDefaultCollection(collectionName);
+ return client;
+ }
+
+
+ long getNumDocs(CloudSolrClient client) throws Exception {
+ String collectionName = client.getDefaultCollection();
+ DocCollection collection = client.getZkStateReader().getClusterState().getCollection(collectionName);
+ Collection<Slice> slices = collection.getSlices();
+
+ long totCount = 0;
+ for (Slice slice : slices) {
+ if (!slice.getState().equals(Slice.State.ACTIVE)) continue;
+ long lastReplicaCount = -1;
+ for (Replica replica : slice.getReplicas()) {
+ SolrClient replicaClient = getHttpSolrClient(replica.getBaseUrl() + "/" + replica.getCoreName());
+ long numFound = 0;
+ try {
+ numFound = replicaClient.query(params("q", "*:*", "distrib", "false")).getResults().getNumFound();
+ log.info("Replica count=" + numFound + " for " + replica);
+ } finally {
+ replicaClient.close();
+ }
+ if (lastReplicaCount >= 0) {
+ assertEquals("Replica doc count for " + replica, lastReplicaCount, numFound);
+ }
+ lastReplicaCount = numFound;
+ }
+ totCount += lastReplicaCount;
+ }
+
+
+ long cloudClientDocs = client.query(new SolrQuery("*:*")).getResults().getNumFound();
+ assertEquals("Sum of shard count should equal distrib query doc count", totCount, cloudClientDocs);
+ return totCount;
+ }
+
+
+ void doLiveSplitShard(String collectionName, int repFactor) throws Exception {
+ final CloudSolrClient client = createCollection(collectionName, repFactor);
+
+ final AtomicBoolean doIndex = new AtomicBoolean(true);
+ final AtomicInteger docsIndexed = new AtomicInteger();
+ Thread indexThread = null;
+ try {
+ // start indexing client before we initiate a shard split
+ indexThread = new Thread(() -> {
+ while (doIndex.get()) {
+ try {
+ // Thread.sleep(10); // uncomment this to cap indexing rate at 100 docs per second...
+ int currDoc = docsIndexed.get();
+
+ // Try all docs in the same update request
+ UpdateRequest updateReq = new UpdateRequest();
+ updateReq.add(sdoc("id", "doc_" + currDoc));
+ UpdateResponse ursp = updateReq.commit(client, collectionName);
+ assertEquals(0, ursp.getStatus()); // for now, don't accept any failures
+ if (ursp.getStatus() == 0) {
+ docsIndexed.incrementAndGet();
+ }
+ } catch (Exception e) {
+ fail(e.getMessage());
+ break;
+ }
+ }
+ });
+ indexThread.start();
+
+ Thread.sleep(100); // wait for a few docs to be indexed before invoking split
+ int docCount = docsIndexed.get();
+
+ CollectionAdminRequest.SplitShard splitShard = CollectionAdminRequest.splitShard(collectionName)
+ .setShardName("shard1");
+ splitShard.process(client);
+ waitForState("Timed out waiting for sub shards to be active.",
+ collectionName, activeClusterShape(2, 3*repFactor)); // 2 repFactor for the new split shards, 1 repFactor for old replicas
+
+ // make sure that docs were able to be indexed during the split
+ assertTrue(docsIndexed.get() > docCount);
+
+ Thread.sleep(100); // wait for a few more docs to be indexed after split
+
+ } finally {
+ // shut down the indexer
+ doIndex.set(false);
+ if (indexThread != null) {
+ indexThread.join();
+ }
+ }
+
+ assertTrue(docsIndexed.get() > 0);
+
+ long numDocs = getNumDocs(client);
+ if (numDocs != docsIndexed.get()) {
+ // Find out what docs are missing.
+ for (int i = 0; i < docsIndexed.get(); i++) {
+ String id = "doc_" + i;
+ long cloudClientDocs = client.query(new SolrQuery("id:" + id)).getResults().getNumFound();
+ if (cloudClientDocs != 1) {
+ log.error("MISSING DOCUMENT " + id);
+ }
+ }
+ }
+
+ assertEquals("Documents are missing!", docsIndexed.get(), numDocs);
+ log.info("Number of documents indexed and queried : " + numDocs);
+ }
+
+ @Test
+ public void testLiveSplit() throws Exception {
+ doLiveSplitShard("livesplit1", 1);
+ }
+
+
}