You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by sh...@apache.org on 2015/01/20 17:19:38 UTC

svn commit: r1653281 - in /lucene/dev/trunk/solr/core/src: java/org/apache/solr/handler/SnapPuller.java test/org/apache/solr/cloud/RecoveryAfterSoftCommitTest.java

Author: shalin
Date: Tue Jan 20 16:19:38 2015
New Revision: 1653281

URL: http://svn.apache.org/r1653281
Log:
SOLR-6640: Close searchers before rollback and recovery to avoid index corruption

Added:
    lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/RecoveryAfterSoftCommitTest.java   (with props)
Modified:
    lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/SnapPuller.java

Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/SnapPuller.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/SnapPuller.java?rev=1653281&r1=1653280&r2=1653281&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/SnapPuller.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/SnapPuller.java Tue Jan 20 16:19:38 2015
@@ -56,10 +56,12 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -78,6 +80,8 @@ import org.apache.commons.io.IOUtils;
 import org.apache.http.client.HttpClient;
 import org.apache.lucene.index.IndexCommit;
 import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.SegmentCommitInfo;
+import org.apache.lucene.index.SegmentInfos;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.IOContext;
 import org.apache.lucene.store.IndexInput;
@@ -405,13 +409,41 @@ public class SnapPuller {
         }
         
         if (!isFullCopyNeeded) {
-          // rollback - and do it before we download any files
-          // so we don't remove files we thought we didn't need
-          // to download later
-          solrCore.getUpdateHandler().getSolrCoreState()
-          .closeIndexWriter(core, true);
+          // a searcher might be using some flushed but committed segments
+          // because of soft commits (which open a searcher on IW's data)
+          // so we need to close the existing searcher on the last commit
+          // and wait until we are able to clean up all unused lucene files
+          if (solrCore.getCoreDescriptor().getCoreContainer().isZooKeeperAware()) {
+            solrCore.closeSearcher();
+          }
+
+          // rollback and reopen index writer and wait until all unused files
+          // are successfully deleted
+          solrCore.getUpdateHandler().newIndexWriter(true);
+          RefCounted<IndexWriter> writer = solrCore.getUpdateHandler().getSolrCoreState().getIndexWriter(null);
+          try {
+            IndexWriter indexWriter = writer.get();
+            int c = 0;
+            indexWriter.deleteUnusedFiles();
+            while (hasUnusedFiles(indexDir, commit)) {
+              indexWriter.deleteUnusedFiles();
+              LOG.info("Sleeping for 1000ms to wait for unused lucene index files to be delete-able");
+              Thread.sleep(1000);
+              c++;
+              if (c >= 30)  {
+                LOG.warn("SnapPuller unable to cleanup unused lucene index files so we must do a full copy instead");
+                isFullCopyNeeded = true;
+                break;
+              }
+            }
+            if (c > 0)  {
+              LOG.info("SnapPuller slept for " + (c * 1000) + "ms for unused lucene index files to be delete-able");
+            }
+          } finally {
+            writer.decref();
+          }
+          solrCore.getUpdateHandler().getSolrCoreState().closeIndexWriter(core, true);
         }
-        
         boolean reloadCore = false;
         
         try {
@@ -542,6 +574,24 @@ public class SnapPuller {
     }
   }
 
+  private boolean hasUnusedFiles(Directory indexDir, IndexCommit commit) throws IOException {
+    Set<String> currentFiles = new HashSet<>();
+    String segmentsFileName = commit.getSegmentsFileName();
+    SegmentInfos infos = SegmentInfos.readCommit(indexDir, segmentsFileName);
+    for (SegmentCommitInfo info : infos.asList()) {
+      Set<String> files = info.info.files(); // All files that belong to this segment
+      currentFiles.addAll(files);
+    }
+    String[] allFiles = indexDir.listAll();
+    for (String file : allFiles) {
+      if (!file.equals(segmentsFileName) && !currentFiles.contains(file)) {
+        LOG.info("Found unused file: " + file);
+        return true;
+      }
+    }
+    return false;
+  }
+
   private volatile Exception fsyncException;
 
   /**

Added: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/RecoveryAfterSoftCommitTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/RecoveryAfterSoftCommitTest.java?rev=1653281&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/RecoveryAfterSoftCommitTest.java (added)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/RecoveryAfterSoftCommitTest.java Tue Jan 20 16:19:38 2015
@@ -0,0 +1,104 @@
+package org.apache.solr.cloud;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.File;
+import java.util.List;
+
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.Replica;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+// See SOLR-6640
+public class RecoveryAfterSoftCommitTest extends AbstractFullDistribZkTestBase {
+
+  public RecoveryAfterSoftCommitTest() {
+    fixShardCount = true;
+    sliceCount = 1;
+    shardCount = 2;
+  }
+
+  @BeforeClass
+  public static void beforeTests() {
+    System.setProperty("solr.tests.maxBufferedDocs", "2");
+  }
+
+  @AfterClass
+  public static void afterTest()  {
+    System.clearProperty("solr.tests.maxBufferedDocs");
+  }
+
+  /**
+   * Overrides the parent implementation to install a SocketProxy in-front of the Jetty server.
+   */
+  @Override
+  public JettySolrRunner createJetty(File solrHome, String dataDir,
+                                     String shardList, String solrConfigOverride, String schemaOverride)
+      throws Exception
+  {
+    return createProxiedJetty(solrHome, dataDir, shardList, solrConfigOverride, schemaOverride);
+  }
+
+  @Override
+  public void doTest() throws Exception {
+    // flush twice
+    for (int i=0; i<4; i++) {
+      SolrInputDocument document = new SolrInputDocument();
+      document.addField("id", String.valueOf(i));
+      document.addField("a_t", "text_" + i);
+      cloudClient.add(document);
+    }
+
+    // soft-commit so searchers are open on un-committed but flushed segment files
+    AbstractUpdateRequest request = new UpdateRequest().setAction(AbstractUpdateRequest.ACTION.COMMIT, true, true, true);
+    cloudClient.request(request);
+
+    Replica notLeader = ensureAllReplicasAreActive(DEFAULT_COLLECTION, "shard1", 1, 2, 30).get(0);
+    // ok, now introduce a network partition between the leader and the replica
+    SocketProxy proxy = getProxyForReplica(notLeader);
+
+    proxy.close();
+
+    // add more than 100 docs so that peer sync cannot be used for recovery
+    for (int i=5; i<115; i++) {
+      SolrInputDocument document = new SolrInputDocument();
+      document.addField("id", String.valueOf(i));
+      document.addField("a_t", "text_" + i);
+      cloudClient.add(document);
+    }
+
+    // Have the partition last at least 1 sec
+    // While this gives the impression that recovery is timing related, this is
+    // really only
+    // to give time for the state to be written to ZK before the test completes.
+    // In other words,
+    // without a brief pause, the test finishes so quickly that it doesn't give
+    // time for the recovery process to kick-in
+    Thread.sleep(2000L);
+
+    proxy.reopen();
+
+    List<Replica> notLeaders =
+        ensureAllReplicasAreActive(DEFAULT_COLLECTION, "shard1", 1, 2, 30);
+  }
+}
+