You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by sh...@apache.org on 2015/01/20 17:19:38 UTC
svn commit: r1653281 - in /lucene/dev/trunk/solr/core/src:
java/org/apache/solr/handler/SnapPuller.java
test/org/apache/solr/cloud/RecoveryAfterSoftCommitTest.java
Author: shalin
Date: Tue Jan 20 16:19:38 2015
New Revision: 1653281
URL: http://svn.apache.org/r1653281
Log:
SOLR-6640: Close searchers before rollback and recovery to avoid index corruption
Added:
lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/RecoveryAfterSoftCommitTest.java (with props)
Modified:
lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/SnapPuller.java
Modified: lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/SnapPuller.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/SnapPuller.java?rev=1653281&r1=1653280&r2=1653281&view=diff
==============================================================================
--- lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/SnapPuller.java (original)
+++ lucene/dev/trunk/solr/core/src/java/org/apache/solr/handler/SnapPuller.java Tue Jan 20 16:19:38 2015
@@ -56,10 +56,12 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
+import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -78,6 +80,8 @@ import org.apache.commons.io.IOUtils;
import org.apache.http.client.HttpClient;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.SegmentCommitInfo;
+import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
@@ -405,13 +409,41 @@ public class SnapPuller {
}
if (!isFullCopyNeeded) {
- // rollback - and do it before we download any files
- // so we don't remove files we thought we didn't need
- // to download later
- solrCore.getUpdateHandler().getSolrCoreState()
- .closeIndexWriter(core, true);
+ // a searcher might be using some flushed but committed segments
+ // because of soft commits (which open a searcher on IW's data)
+ // so we need to close the existing searcher on the last commit
+ // and wait until we are able to clean up all unused lucene files
+ if (solrCore.getCoreDescriptor().getCoreContainer().isZooKeeperAware()) {
+ solrCore.closeSearcher();
+ }
+
+ // rollback and reopen index writer and wait until all unused files
+ // are successfully deleted
+ solrCore.getUpdateHandler().newIndexWriter(true);
+ RefCounted<IndexWriter> writer = solrCore.getUpdateHandler().getSolrCoreState().getIndexWriter(null);
+ try {
+ IndexWriter indexWriter = writer.get();
+ int c = 0;
+ indexWriter.deleteUnusedFiles();
+ while (hasUnusedFiles(indexDir, commit)) {
+ indexWriter.deleteUnusedFiles();
+ LOG.info("Sleeping for 1000ms to wait for unused lucene index files to be delete-able");
+ Thread.sleep(1000);
+ c++;
+ if (c >= 30) {
+ LOG.warn("SnapPuller unable to cleanup unused lucene index files so we must do a full copy instead");
+ isFullCopyNeeded = true;
+ break;
+ }
+ }
+ if (c > 0) {
+ LOG.info("SnapPuller slept for " + (c * 1000) + "ms for unused lucene index files to be delete-able");
+ }
+ } finally {
+ writer.decref();
+ }
+ solrCore.getUpdateHandler().getSolrCoreState().closeIndexWriter(core, true);
}
-
boolean reloadCore = false;
try {
@@ -542,6 +574,24 @@ public class SnapPuller {
}
}
+ private boolean hasUnusedFiles(Directory indexDir, IndexCommit commit) throws IOException {
+ Set<String> currentFiles = new HashSet<>();
+ String segmentsFileName = commit.getSegmentsFileName();
+ SegmentInfos infos = SegmentInfos.readCommit(indexDir, segmentsFileName);
+ for (SegmentCommitInfo info : infos.asList()) {
+ Set<String> files = info.info.files(); // All files that belong to this segment
+ currentFiles.addAll(files);
+ }
+ String[] allFiles = indexDir.listAll();
+ for (String file : allFiles) {
+ if (!file.equals(segmentsFileName) && !currentFiles.contains(file)) {
+ LOG.info("Found unused file: " + file);
+ return true;
+ }
+ }
+ return false;
+ }
+
private volatile Exception fsyncException;
/**
Added: lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/RecoveryAfterSoftCommitTest.java
URL: http://svn.apache.org/viewvc/lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/RecoveryAfterSoftCommitTest.java?rev=1653281&view=auto
==============================================================================
--- lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/RecoveryAfterSoftCommitTest.java (added)
+++ lucene/dev/trunk/solr/core/src/test/org/apache/solr/cloud/RecoveryAfterSoftCommitTest.java Tue Jan 20 16:19:38 2015
@@ -0,0 +1,104 @@
+package org.apache.solr.cloud;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.File;
+import java.util.List;
+
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.Replica;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+// See SOLR-6640
+public class RecoveryAfterSoftCommitTest extends AbstractFullDistribZkTestBase {
+
+ public RecoveryAfterSoftCommitTest() {
+ fixShardCount = true;
+ sliceCount = 1;
+ shardCount = 2;
+ }
+
+ @BeforeClass
+ public static void beforeTests() {
+ System.setProperty("solr.tests.maxBufferedDocs", "2");
+ }
+
+ @AfterClass
+ public static void afterTest() {
+ System.clearProperty("solr.tests.maxBufferedDocs");
+ }
+
+ /**
+ * Overrides the parent implementation to install a SocketProxy in-front of the Jetty server.
+ */
+ @Override
+ public JettySolrRunner createJetty(File solrHome, String dataDir,
+ String shardList, String solrConfigOverride, String schemaOverride)
+ throws Exception
+ {
+ return createProxiedJetty(solrHome, dataDir, shardList, solrConfigOverride, schemaOverride);
+ }
+
+ @Override
+ public void doTest() throws Exception {
+ // flush twice
+ for (int i=0; i<4; i++) {
+ SolrInputDocument document = new SolrInputDocument();
+ document.addField("id", String.valueOf(i));
+ document.addField("a_t", "text_" + i);
+ cloudClient.add(document);
+ }
+
+ // soft-commit so searchers are open on un-committed but flushed segment files
+ AbstractUpdateRequest request = new UpdateRequest().setAction(AbstractUpdateRequest.ACTION.COMMIT, true, true, true);
+ cloudClient.request(request);
+
+ Replica notLeader = ensureAllReplicasAreActive(DEFAULT_COLLECTION, "shard1", 1, 2, 30).get(0);
+ // ok, now introduce a network partition between the leader and the replica
+ SocketProxy proxy = getProxyForReplica(notLeader);
+
+ proxy.close();
+
+ // add more than 100 docs so that peer sync cannot be used for recovery
+ for (int i=5; i<115; i++) {
+ SolrInputDocument document = new SolrInputDocument();
+ document.addField("id", String.valueOf(i));
+ document.addField("a_t", "text_" + i);
+ cloudClient.add(document);
+ }
+
+ // Have the partition last at least 1 sec
+ // While this gives the impression that recovery is timing related, this is
+ // really only
+ // to give time for the state to be written to ZK before the test completes.
+ // In other words,
+ // without a brief pause, the test finishes so quickly that it doesn't give
+ // time for the recovery process to kick-in
+ Thread.sleep(2000L);
+
+ proxy.reopen();
+
+ List<Replica> notLeaders =
+ ensureAllReplicasAreActive(DEFAULT_COLLECTION, "shard1", 1, 2, 30);
+ }
+}
+