You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by sh...@apache.org on 2016/09/07 15:37:07 UTC
lucene-solr:master: SOLR-9488: Shard split can fail to write commit
data on shutdown/restart causing replicas to recover without replicating the
index
Repository: lucene-solr
Updated Branches:
refs/heads/master ce84a8df1 -> 0c5c0df6b
SOLR-9488: Shard split can fail to write commit data on shutdown/restart causing replicas to recover without replicating the index
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/0c5c0df6
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/0c5c0df6
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/0c5c0df6
Branch: refs/heads/master
Commit: 0c5c0df6bc8d3738ef6ed071a0f51913f804dde1
Parents: ce84a8d
Author: Shalin Shekhar Mangar <sh...@apache.org>
Authored: Wed Sep 7 21:06:50 2016 +0530
Committer: Shalin Shekhar Mangar <sh...@apache.org>
Committed: Wed Sep 7 21:06:50 2016 +0530
----------------------------------------------------------------------
solr/CHANGES.txt | 3 +
.../org/apache/solr/handler/IndexFetcher.java | 6 +-
.../solr/update/DirectUpdateHandler2.java | 3 +
.../apache/solr/update/SolrIndexSplitter.java | 16 +-
.../org/apache/solr/cloud/ShardSplitTest.java | 152 +++++++++++++++++++
5 files changed, 177 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0c5c0df6/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 5225c7c..9017af4 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -116,6 +116,9 @@ Bug Fixes
* SOLR-9381: Snitch for freedisk uses '/' instead of 'coreRootDirectory' (Tim Owen, noble)
+* SOLR-9488: Shard split can fail to write commit data on shutdown/restart causing replicas to recover
+ without replicating the index. This can cause data loss. (shalin)
+
Optimizations
----------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0c5c0df6/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
index 080cf9f..e38f722 100644
--- a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
+++ b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
@@ -308,6 +308,9 @@ public class IndexFetcher {
long latestVersion = (Long) response.get(CMD_INDEX_VERSION);
long latestGeneration = (Long) response.get(GENERATION);
+ LOG.info("Master's generation: " + latestGeneration);
+ LOG.info("Master's version: " + latestVersion);
+
// TODO: make sure that getLatestCommit only returns commit points for the main index (i.e. no side-car indexes)
IndexCommit commit = solrCore.getDeletionPolicy().getLatestCommit();
if (commit == null) {
@@ -326,6 +329,7 @@ public class IndexFetcher {
}
}
+ LOG.info("Slave's generation: " + commit.getGeneration());
if (latestVersion == 0L) {
if (forceReplication && commit.getGeneration() != 0) {
@@ -353,8 +357,6 @@ public class IndexFetcher {
successfulInstall = true;
return true;
}
- LOG.info("Master's generation: " + latestGeneration);
- LOG.info("Slave's generation: " + commit.getGeneration());
LOG.info("Starting replication process");
// get the list of files first
fetchFileList(latestGeneration);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0c5c0df6/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java b/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
index 0bdefa7..12552cd 100644
--- a/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
+++ b/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
@@ -519,6 +519,9 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
@SuppressForbidden(reason = "Need currentTimeMillis, commit time should be used only for debugging purposes, " +
" but currently suspiciously used for replication as well")
+ /*
+ Also see SolrIndexSplitter.setCommitData
+ */
private void setCommitData(IndexWriter iw) {
final Map<String,String> commitData = new HashMap<>();
commitData.put(SolrIndexWriter.COMMIT_TIME_MSEC_KEY,
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0c5c0df6/solr/core/src/java/org/apache/solr/update/SolrIndexSplitter.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/SolrIndexSplitter.java b/solr/core/src/java/org/apache/solr/update/SolrIndexSplitter.java
index 5f1ea0e..14e7063 100644
--- a/solr/core/src/java/org/apache/solr/update/SolrIndexSplitter.java
+++ b/solr/core/src/java/org/apache/solr/update/SolrIndexSplitter.java
@@ -19,7 +19,9 @@ package org.apache.solr.update;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import org.apache.lucene.index.CodecReader;
import org.apache.lucene.index.FilterCodecReader;
@@ -40,6 +42,7 @@ import org.apache.lucene.util.IOUtils;
import org.apache.solr.common.cloud.CompositeIdRouter;
import org.apache.solr.common.cloud.DocRouter;
import org.apache.solr.common.cloud.HashBasedRouter;
+import org.apache.solr.common.util.SuppressForbidden;
import org.apache.solr.core.SolrCore;
import org.apache.solr.schema.SchemaField;
import org.apache.solr.search.BitsFilteredPostingsEnum;
@@ -134,6 +137,11 @@ public class SolrIndexSplitter {
CodecReader subReader = SlowCodecReaderWrapper.wrap(leaves.get(segmentNumber).reader());
iw.addIndexes(new LiveDocsReader(subReader, segmentDocSets.get(segmentNumber)[partitionNumber]));
}
+ // we commit explicitly instead of sending a CommitUpdateCommand through the processor chain
+ // because the sub-shard cores will just ignore such a commit because the update log is not
+ // in active state at this time.
+ setCommitData(iw);
+ iw.commit();
success = true;
} finally {
if (iwRef != null) {
@@ -151,7 +159,13 @@ public class SolrIndexSplitter {
}
-
+ @SuppressForbidden(reason = "Need currentTimeMillis, commit time should be used only for debugging purposes, " +
+ " but currently suspiciously used for replication as well")
+ private void setCommitData(IndexWriter iw) {
+ final Map<String,String> commitData = new HashMap<>();
+ commitData.put(SolrIndexWriter.COMMIT_TIME_MSEC_KEY, String.valueOf(System.currentTimeMillis()));
+ iw.setLiveCommitData(commitData.entrySet());
+ }
FixedBitSet[] split(LeafReaderContext readerContext) throws IOException {
LeafReader reader = readerContext.reader();
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0c5c0df6/solr/core/src/test/org/apache/solr/cloud/ShardSplitTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/ShardSplitTest.java b/solr/core/src/test/org/apache/solr/cloud/ShardSplitTest.java
index 13e45cd..c8519e7 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ShardSplitTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ShardSplitTest.java
@@ -25,20 +25,25 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.response.CollectionAdminResponse;
import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.client.solrj.response.RequestStatusState;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.CollectionStateWatcher;
import org.apache.solr.common.cloud.CompositeIdRouter;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.DocRouter;
@@ -56,6 +61,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.solr.cloud.OverseerCollectionMessageHandler.NUM_SLICES;
+import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
@@ -71,6 +77,12 @@ public class ShardSplitTest extends BasicDistributedZkTest {
schemaString = "schema15.xml"; // we need a string id
}
+ @Override
+ public void distribSetUp() throws Exception {
+ super.distribSetUp();
+ useFactory(null);
+ }
+
@Test
public void test() throws Exception {
@@ -91,6 +103,146 @@ public class ShardSplitTest extends BasicDistributedZkTest {
//waitForThingsToLevelOut(15);
}
+ /*
+ Creates a collection with replicationFactor=1, splits a shard. Restarts the sub-shard leader node.
+ Add a replica. Ensure count matches in leader and replica.
+ */
+ public void testSplitStaticIndexReplication() throws Exception {
+ waitForThingsToLevelOut(15);
+
+ DocCollection defCol = cloudClient.getZkStateReader().getClusterState().getCollection(AbstractDistribZkTestBase.DEFAULT_COLLECTION);
+ Replica replica = defCol.getReplicas().get(0);
+ String nodeName = replica.getNodeName();
+
+ String collectionName = "testSplitStaticIndexReplication";
+ CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName, "conf1", 1, 1);
+ create.setMaxShardsPerNode(5); // some high number so we can create replicas without hindrance
+ create.setCreateNodeSet(nodeName); // we want to create the leader on a fixed node so that we know which one to restart later
+ create.process(cloudClient);
+ try (CloudSolrClient client = getCloudSolrClient(zkServer.getZkAddress(), true, cloudClient.getLbClient().getHttpClient())) {
+ client.setDefaultCollection(collectionName);
+ StoppableIndexingThread thread = new StoppableIndexingThread(controlClient, client, "i1", true);
+ try {
+ thread.start();
+ Thread.sleep(1000); // give the indexer sometime to do its work
+ thread.safeStop();
+ thread.join();
+ client.commit();
+ controlClient.commit();
+
+ CollectionAdminRequest.SplitShard splitShard = CollectionAdminRequest.splitShard(collectionName);
+ splitShard.setShardName(SHARD1);
+ String asyncId = splitShard.processAsync(client);
+ RequestStatusState state = CollectionAdminRequest.requestStatus(asyncId).waitFor(client, 120);
+ if (state == RequestStatusState.COMPLETED) {
+ waitForRecoveriesToFinish(collectionName, true);
+ // let's wait to see parent shard become inactive
+ CountDownLatch latch = new CountDownLatch(1);
+ client.getZkStateReader().registerCollectionStateWatcher(collectionName, new CollectionStateWatcher() {
+ @Override
+ public boolean onStateChanged(Set<String> liveNodes, DocCollection collectionState) {
+ Slice parent = collectionState.getSlice(SHARD1);
+ Slice slice10 = collectionState.getSlice(SHARD1_0);
+ Slice slice11 = collectionState.getSlice(SHARD1_1);
+ if (slice10 != null && slice11 != null &&
+ parent.getState() == Slice.State.INACTIVE &&
+ slice10.getState() == Slice.State.ACTIVE &&
+ slice11.getState() == Slice.State.ACTIVE) {
+ latch.countDown();
+ return true; // removes the watch
+ }
+ return false;
+ }
+ });
+ latch.await(1, TimeUnit.MINUTES);
+ if (latch.getCount() != 0) {
+ // sanity check
+ fail("Sub-shards did not become active even after waiting for 1 minute");
+ }
+
+ int liveNodeCount = client.getZkStateReader().getClusterState().getLiveNodes().size();
+
+ // restart the sub-shard leader node
+ boolean restarted = false;
+ for (JettySolrRunner jetty : jettys) {
+ int port = jetty.getBaseUrl().getPort();
+ if (replica.getStr(BASE_URL_PROP).contains(":" + port)) {
+ ChaosMonkey.kill(jetty);
+ ChaosMonkey.start(jetty);
+ restarted = true;
+ break;
+ }
+ }
+ if (!restarted) {
+ // sanity check
+ fail("We could not find a jetty to kill for replica: " + replica.getCoreUrl());
+ }
+
+ // add a new replica for the sub-shard
+ CollectionAdminRequest.AddReplica addReplica = CollectionAdminRequest.addReplicaToShard(collectionName, SHARD1_0);
+ // use control client because less chances of it being the node being restarted
+ // this is to avoid flakiness of test because of NoHttpResponseExceptions
+ String control_collection = client.getZkStateReader().getClusterState().getCollection("control_collection").getReplicas().get(0).getStr(BASE_URL_PROP);
+ try (HttpSolrClient control = new HttpSolrClient.Builder(control_collection).withHttpClient(client.getLbClient().getHttpClient()).build()) {
+ state = addReplica.processAndWait(control, 30);
+ }
+ if (state == RequestStatusState.COMPLETED) {
+ CountDownLatch newReplicaLatch = new CountDownLatch(1);
+ client.getZkStateReader().registerCollectionStateWatcher(collectionName, new CollectionStateWatcher() {
+ @Override
+ public boolean onStateChanged(Set<String> liveNodes, DocCollection collectionState) {
+ if (liveNodes.size() != liveNodeCount) {
+ return false;
+ }
+ Slice slice = collectionState.getSlice(SHARD1_0);
+ if (slice.getReplicas().size() == 2) {
+ if (!slice.getReplicas().stream().anyMatch(r -> r.getState() == Replica.State.RECOVERING)) {
+ // we see replicas and none of them are recovering
+ newReplicaLatch.countDown();
+ return true;
+ }
+ }
+ return false;
+ }
+ });
+ newReplicaLatch.await(30, TimeUnit.SECONDS);
+ // check consistency of sub-shard replica explicitly because checkShardConsistency methods doesn't
+ // handle new shards/replica so well.
+ ClusterState clusterState = client.getZkStateReader().getClusterState();
+ DocCollection collection = clusterState.getCollection(collectionName);
+ int numReplicasChecked = assertConsistentReplicas(collection.getSlice(SHARD1_0));
+ assertEquals("We should have checked consistency for exactly 2 replicas of shard1_0", 2, numReplicasChecked);
+ } else {
+ fail("Adding a replica to sub-shard did not complete even after waiting for 30 seconds!. Saw state = " + state.getKey());
+ }
+ } else {
+ fail("We expected shard split to succeed on a static index but it didn't. Found state = " + state.getKey());
+ }
+ } finally {
+ thread.safeStop();
+ thread.join();
+ }
+ }
+ }
+
+ private int assertConsistentReplicas(Slice shard) throws SolrServerException, IOException {
+ long numFound = Long.MIN_VALUE;
+ int count = 0;
+ for (Replica replica : shard.getReplicas()) {
+ HttpSolrClient client = new HttpSolrClient.Builder(replica.getCoreUrl())
+ .withHttpClient(cloudClient.getLbClient().getHttpClient()).build();
+ QueryResponse response = client.query(new SolrQuery("q", "*:*", "distrib", "false"));
+ log.info("Found numFound={} on replica: {}", response.getResults().getNumFound(), replica.getCoreUrl());
+ if (numFound == Long.MIN_VALUE) {
+ numFound = response.getResults().getNumFound();
+ } else {
+ assertEquals("Shard " + shard.getName() + " replicas do not have same number of documents", numFound, response.getResults().getNumFound());
+ }
+ count++;
+ }
+ return count;
+ }
+
/**
* Used to test that we can split a shard when a previous split event
* left sub-shards in construction or recovery state.