You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by sh...@apache.org on 2016/09/12 09:33:49 UTC

[12/15] lucene-solr:branch_6_2: SOLR-9488: Shard split can fail to write commit data on shutdown/restart causing replicas to recover without replicating the index (cherry picked from commit 0c5c0df)

SOLR-9488: Shard split can fail to write commit data on shutdown/restart causing replicas to recover without replicating the index
(cherry picked from commit 0c5c0df)

(cherry picked from commit 7370407)


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/0af3d4e8
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/0af3d4e8
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/0af3d4e8

Branch: refs/heads/branch_6_2
Commit: 0af3d4e8e6ff1fa38d59739f30cfffea65a8033c
Parents: f4673ca
Author: Shalin Shekhar Mangar <sh...@apache.org>
Authored: Wed Sep 7 21:06:50 2016 +0530
Committer: Shalin Shekhar Mangar <sh...@apache.org>
Committed: Mon Sep 12 10:11:20 2016 +0530

----------------------------------------------------------------------
 solr/CHANGES.txt                                |   3 +
 .../org/apache/solr/handler/IndexFetcher.java   |   6 +-
 .../solr/update/DirectUpdateHandler2.java       |   3 +
 .../apache/solr/update/SolrIndexSplitter.java   |  16 +-
 .../org/apache/solr/cloud/ShardSplitTest.java   | 152 +++++++++++++++++++
 5 files changed, 177 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0af3d4e8/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index cb04779..64a0b9d 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -48,6 +48,9 @@ Bug Fixes
 * SOLR-9444: Fix path usage for cloud backup/restore.  (Hrishikesh Gadre, Uwe Schindler, Varun Thacker)
 
 * SOLR-9381: Snitch for freedisk uses '/' instead of 'coreRootDirectory' (Tim Owen, noble)
+
+* SOLR-9488: Shard split can fail to write commit data on shutdown/restart causing replicas to recover
+  without replicating the index. This can cause data loss. (shalin)
 ==================  6.2.0 ==================
 
 Versions of Major Components

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0af3d4e8/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
index 634a9e0..c66b3ab 100644
--- a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
+++ b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
@@ -294,6 +294,9 @@ public class IndexFetcher {
       long latestVersion = (Long) response.get(CMD_INDEX_VERSION);
       long latestGeneration = (Long) response.get(GENERATION);
 
+      LOG.info("Master's generation: " + latestGeneration);
+      LOG.info("Master's version: " + latestVersion);
+
       // TODO: make sure that getLatestCommit only returns commit points for the main index (i.e. no side-car indexes)
       IndexCommit commit = solrCore.getDeletionPolicy().getLatestCommit();
       if (commit == null) {
@@ -312,6 +315,7 @@ public class IndexFetcher {
         }
       }
 
+      LOG.info("Slave's generation: " + commit.getGeneration());
 
       if (latestVersion == 0L) {
         if (forceReplication && commit.getGeneration() != 0) {
@@ -339,8 +343,6 @@ public class IndexFetcher {
         successfulInstall = true;
         return true;
       }
-      LOG.info("Master's generation: " + latestGeneration);
-      LOG.info("Slave's generation: " + commit.getGeneration());
       LOG.info("Starting replication process");
       // get the list of files first
       fetchFileList(latestGeneration);

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0af3d4e8/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java b/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
index 0bdefa7..12552cd 100644
--- a/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
+++ b/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
@@ -519,6 +519,9 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
 
   @SuppressForbidden(reason = "Need currentTimeMillis, commit time should be used only for debugging purposes, " +
       " but currently suspiciously used for replication as well")
+  /*
+  Also see SolrIndexSplitter.setCommitData
+   */
   private void setCommitData(IndexWriter iw) {
     final Map<String,String> commitData = new HashMap<>();
     commitData.put(SolrIndexWriter.COMMIT_TIME_MSEC_KEY,

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0af3d4e8/solr/core/src/java/org/apache/solr/update/SolrIndexSplitter.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/update/SolrIndexSplitter.java b/solr/core/src/java/org/apache/solr/update/SolrIndexSplitter.java
index 5f1ea0e..14e7063 100644
--- a/solr/core/src/java/org/apache/solr/update/SolrIndexSplitter.java
+++ b/solr/core/src/java/org/apache/solr/update/SolrIndexSplitter.java
@@ -19,7 +19,9 @@ package org.apache.solr.update;
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.lucene.index.CodecReader;
 import org.apache.lucene.index.FilterCodecReader;
@@ -40,6 +42,7 @@ import org.apache.lucene.util.IOUtils;
 import org.apache.solr.common.cloud.CompositeIdRouter;
 import org.apache.solr.common.cloud.DocRouter;
 import org.apache.solr.common.cloud.HashBasedRouter;
+import org.apache.solr.common.util.SuppressForbidden;
 import org.apache.solr.core.SolrCore;
 import org.apache.solr.schema.SchemaField;
 import org.apache.solr.search.BitsFilteredPostingsEnum;
@@ -134,6 +137,11 @@ public class SolrIndexSplitter {
           CodecReader subReader = SlowCodecReaderWrapper.wrap(leaves.get(segmentNumber).reader());
           iw.addIndexes(new LiveDocsReader(subReader, segmentDocSets.get(segmentNumber)[partitionNumber]));
         }
+        // we commit explicitly instead of sending a CommitUpdateCommand through the processor chain
+        // because the sub-shard cores will just ignore such a commit because the update log is not
+        // in active state at this time.
+        setCommitData(iw);
+        iw.commit();
         success = true;
       } finally {
         if (iwRef != null) {
@@ -151,7 +159,13 @@ public class SolrIndexSplitter {
 
   }
 
-
+  @SuppressForbidden(reason = "Need currentTimeMillis, commit time should be used only for debugging purposes, " +
+      " but currently suspiciously used for replication as well")
+  private void setCommitData(IndexWriter iw) {
+    final Map<String,String> commitData = new HashMap<>();
+    commitData.put(SolrIndexWriter.COMMIT_TIME_MSEC_KEY, String.valueOf(System.currentTimeMillis()));
+    iw.setLiveCommitData(commitData.entrySet());
+  }
 
   FixedBitSet[] split(LeafReaderContext readerContext) throws IOException {
     LeafReader reader = readerContext.reader();

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/0af3d4e8/solr/core/src/test/org/apache/solr/cloud/ShardSplitTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/ShardSplitTest.java b/solr/core/src/test/org/apache/solr/cloud/ShardSplitTest.java
index 389660f..0000e7b 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ShardSplitTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ShardSplitTest.java
@@ -25,6 +25,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.http.params.CoreConnectionPNames;
 import org.apache.lucene.util.LuceneTestCase.Slow;
@@ -32,14 +34,17 @@ import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.SolrQuery;
 import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.client.solrj.request.QueryRequest;
 import org.apache.solr.client.solrj.response.CollectionAdminResponse;
 import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.client.solrj.response.RequestStatusState;
 import org.apache.solr.common.SolrDocument;
 import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.CollectionStateWatcher;
 import org.apache.solr.common.cloud.CompositeIdRouter;
 import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.DocRouter;
@@ -57,6 +62,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static org.apache.solr.cloud.OverseerCollectionMessageHandler.NUM_SLICES;
+import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP;
 import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
 import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
 
@@ -72,6 +78,12 @@ public class ShardSplitTest extends BasicDistributedZkTest {
     schemaString = "schema15.xml";      // we need a string id
   }
 
+  @Override
+  public void distribSetUp() throws Exception {
+    super.distribSetUp();
+    useFactory(null);
+  }
+
   @Test
   public void test() throws Exception {
 
@@ -92,6 +104,146 @@ public class ShardSplitTest extends BasicDistributedZkTest {
     //waitForThingsToLevelOut(15);
   }
 
+  /*
+  Creates a collection with replicationFactor=1, splits a shard. Restarts the sub-shard leader node.
+  Add a replica. Ensure count matches in leader and replica.
+   */
+  public void testSplitStaticIndexReplication() throws Exception {
+    waitForThingsToLevelOut(15);
+
+    DocCollection defCol = cloudClient.getZkStateReader().getClusterState().getCollection(AbstractDistribZkTestBase.DEFAULT_COLLECTION);
+    Replica replica = defCol.getReplicas().get(0);
+    String nodeName = replica.getNodeName();
+
+    String collectionName = "testSplitStaticIndexReplication";
+    CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName, "conf1", 1, 1);
+    create.setMaxShardsPerNode(5); // some high number so we can create replicas without hindrance
+    create.setCreateNodeSet(nodeName); // we want to create the leader on a fixed node so that we know which one to restart later
+    create.process(cloudClient);
+    try (CloudSolrClient client = getCloudSolrClient(zkServer.getZkAddress(), true, cloudClient.getLbClient().getHttpClient())) {
+      client.setDefaultCollection(collectionName);
+      StoppableIndexingThread thread = new StoppableIndexingThread(controlClient, client, "i1", true);
+      try {
+        thread.start();
+        Thread.sleep(1000); // give the indexer sometime to do its work
+        thread.safeStop();
+        thread.join();
+        client.commit();
+        controlClient.commit();
+
+        CollectionAdminRequest.SplitShard splitShard = CollectionAdminRequest.splitShard(collectionName);
+        splitShard.setShardName(SHARD1);
+        String asyncId = splitShard.processAsync(client);
+        RequestStatusState state = CollectionAdminRequest.requestStatus(asyncId).waitFor(client, 120);
+        if (state == RequestStatusState.COMPLETED)  {
+          waitForRecoveriesToFinish(collectionName, true);
+          // let's wait to see parent shard become inactive
+          CountDownLatch latch = new CountDownLatch(1);
+          client.getZkStateReader().registerCollectionStateWatcher(collectionName, new CollectionStateWatcher() {
+            @Override
+            public boolean onStateChanged(Set<String> liveNodes, DocCollection collectionState) {
+              Slice parent = collectionState.getSlice(SHARD1);
+              Slice slice10 = collectionState.getSlice(SHARD1_0);
+              Slice slice11 = collectionState.getSlice(SHARD1_1);
+              if (slice10 != null && slice11 != null &&
+                  parent.getState() == Slice.State.INACTIVE &&
+                  slice10.getState() == Slice.State.ACTIVE &&
+                  slice11.getState() == Slice.State.ACTIVE) {
+                latch.countDown();
+                return true; // removes the watch
+              }
+              return false;
+            }
+          });
+          latch.await(1, TimeUnit.MINUTES);
+          if (latch.getCount() != 0)  {
+            // sanity check
+            fail("Sub-shards did not become active even after waiting for 1 minute");
+          }
+
+          int liveNodeCount = client.getZkStateReader().getClusterState().getLiveNodes().size();
+
+          // restart the sub-shard leader node
+          boolean restarted = false;
+          for (JettySolrRunner jetty : jettys) {
+            int port = jetty.getBaseUrl().getPort();
+            if (replica.getStr(BASE_URL_PROP).contains(":" + port))  {
+              ChaosMonkey.kill(jetty);
+              ChaosMonkey.start(jetty);
+              restarted = true;
+              break;
+            }
+          }
+          if (!restarted) {
+            // sanity check
+            fail("We could not find a jetty to kill for replica: " + replica.getCoreUrl());
+          }
+
+          // add a new replica for the sub-shard
+          CollectionAdminRequest.AddReplica addReplica = CollectionAdminRequest.addReplicaToShard(collectionName, SHARD1_0);
+          // use control client because less chances of it being the node being restarted
+          // this is to avoid flakiness of test because of NoHttpResponseExceptions
+          String control_collection = client.getZkStateReader().getClusterState().getCollection("control_collection").getReplicas().get(0).getStr(BASE_URL_PROP);
+          try (HttpSolrClient control = new HttpSolrClient.Builder(control_collection).withHttpClient(client.getLbClient().getHttpClient()).build())  {
+            state = addReplica.processAndWait(control, 30);
+          }
+          if (state == RequestStatusState.COMPLETED)  {
+            CountDownLatch newReplicaLatch = new CountDownLatch(1);
+            client.getZkStateReader().registerCollectionStateWatcher(collectionName, new CollectionStateWatcher() {
+              @Override
+              public boolean onStateChanged(Set<String> liveNodes, DocCollection collectionState) {
+                if (liveNodes.size() != liveNodeCount)  {
+                  return false;
+                }
+                Slice slice = collectionState.getSlice(SHARD1_0);
+                if (slice.getReplicas().size() == 2)  {
+                  if (!slice.getReplicas().stream().anyMatch(r -> r.getState() == Replica.State.RECOVERING)) {
+                    // we see replicas and none of them are recovering
+                    newReplicaLatch.countDown();
+                    return true;
+                  }
+                }
+                return false;
+              }
+            });
+            newReplicaLatch.await(30, TimeUnit.SECONDS);
+            // check consistency of sub-shard replica explicitly because checkShardConsistency methods doesn't
+            // handle new shards/replica so well.
+            ClusterState clusterState = client.getZkStateReader().getClusterState();
+            DocCollection collection = clusterState.getCollection(collectionName);
+            int numReplicasChecked = assertConsistentReplicas(collection.getSlice(SHARD1_0));
+            assertEquals("We should have checked consistency for exactly 2 replicas of shard1_0", 2, numReplicasChecked);
+          } else  {
+            fail("Adding a replica to sub-shard did not complete even after waiting for 30 seconds!. Saw state = " + state.getKey());
+          }
+        } else {
+          fail("We expected shard split to succeed on a static index but it didn't. Found state = " + state.getKey());
+        }
+      } finally {
+        thread.safeStop();
+        thread.join();
+      }
+    }
+  }
+
+  private int assertConsistentReplicas(Slice shard) throws SolrServerException, IOException {
+    long numFound = Long.MIN_VALUE;
+    int count = 0;
+    for (Replica replica : shard.getReplicas()) {
+      HttpSolrClient client = new HttpSolrClient.Builder(replica.getCoreUrl())
+          .withHttpClient(cloudClient.getLbClient().getHttpClient()).build();
+      QueryResponse response = client.query(new SolrQuery("q", "*:*", "distrib", "false"));
+      log.info("Found numFound={} on replica: {}", response.getResults().getNumFound(), replica.getCoreUrl());
+      if (numFound == Long.MIN_VALUE)  {
+        numFound = response.getResults().getNumFound();
+      } else  {
+        assertEquals("Shard " + shard.getName() + " replicas do not have same number of documents", numFound, response.getResults().getNumFound());
+      }
+      count++;
+    }
+    return count;
+  }
+
   /**
    * Used to test that we can split a shard when a previous split event
    * left sub-shards in construction or recovery state.