You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by bu...@apache.org on 2010/07/27 22:33:06 UTC
svn commit: r979856 - in /lucene/dev/branches/realtime_search/lucene/src:
java/org/apache/lucene/index/ java/org/apache/lucene/store/
test/org/apache/lucene/index/
Author: buschmi
Date: Tue Jul 27 20:33:05 2010
New Revision: 979856
URL: http://svn.apache.org/viewvc?rev=979856&view=rev
Log:
LUCENE-2561: Fix most of the thread-safety and exception handling problems in the realtime search branch
Added:
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexReaderPool.java
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/store/FilterDirectory.java
Modified:
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterThreadPool.java
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexFileDeleter.java
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriter.java
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/StoredFieldsWriter.java
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriter.java
lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermsHash.java
lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java
lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestIndexWriter.java
lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java
lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestNRTReaderWithThreads.java
lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestThreadedOptimize.java
Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java?rev=979856&r1=979855&r2=979856&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriter.java Tue Jul 27 20:33:05 2010
@@ -2,13 +2,13 @@ package org.apache.lucene.index;
import java.io.IOException;
import java.io.PrintStream;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
-import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;
@@ -24,6 +24,8 @@ import org.apache.lucene.search.Scorer;
import org.apache.lucene.search.Weight;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FilterDirectory;
+import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.util.BytesRef;
/**
@@ -51,7 +53,7 @@ final class DocumentsWriter {
private final DocumentsWriterThreadPool threadPool;
private final Lock sequenceIDLock = new ReentrantLock();
- private final Directory directory;
+ private final Directory openFilesTrackingDirectory;
final IndexWriter indexWriter;
final IndexWriterConfig config;
@@ -69,7 +71,14 @@ final class DocumentsWriter {
private Map<DocumentsWriterPerThread, Long> minSequenceIDsPerThread = new HashMap<DocumentsWriterPerThread, Long>();
public DocumentsWriter(Directory directory, IndexWriter indexWriter, IndexWriterConfig config) {
- this.directory = directory;
+ this.openFilesTrackingDirectory = new FilterDirectory(directory) {
+ @Override public IndexOutput createOutput(final String name) throws IOException {
+ addOpenFile(name);
+ return super.createOutput(name);
+ }
+ };
+
+ //this.openFilesTrackingDirectory = directory;
this.indexWriter = indexWriter;
this.config = config;
this.maxBufferedDocs = config.getMaxBufferedDocs();
@@ -111,7 +120,7 @@ final class DocumentsWriter {
}
DocumentsWriterPerThread newDocumentsWriterPerThread() {
- DocumentsWriterPerThread perThread = new DocumentsWriterPerThread(directory, this, config
+ DocumentsWriterPerThread perThread = new DocumentsWriterPerThread(openFilesTrackingDirectory, this, config
.getIndexingChain());
sequenceIDLock.lock();
try {
@@ -127,13 +136,23 @@ final class DocumentsWriter {
return updateDocument(null, doc, analyzer);
}
+ private final static class UpdateResult {
+ long sequenceID;
+ boolean flushed;
+
+ UpdateResult(long sequenceID) {
+ this.sequenceID = sequenceID;
+ flushed = false;
+ }
+ }
+
long updateDocument(final Term delTerm, final Document doc, final Analyzer analyzer)
throws CorruptIndexException, IOException {
- long seqID = threadPool.executePerThread(this, doc,
- new DocumentsWriterThreadPool.PerThreadTask<Long>() {
+ UpdateResult result = threadPool.executePerThread(this, doc,
+ new DocumentsWriterThreadPool.PerThreadTask<UpdateResult>() {
@Override
- public Long process(final DocumentsWriterPerThread perThread) throws IOException {
+ public UpdateResult process(final DocumentsWriterPerThread perThread) throws IOException {
long perThreadRAMUsedBeforeAdd = perThread.numBytesUsed;
perThread.addDocument(doc, analyzer);
@@ -154,16 +173,23 @@ final class DocumentsWriter {
sequenceIDLock.unlock();
}
+ UpdateResult result = new UpdateResult(sequenceID);
if (finishAddDocument(perThread, perThreadRAMUsedBeforeAdd)) {
+ result.flushed = true;
super.clearThreadBindings();
}
- return sequenceID;
+ return result;
}
});
+
+ if (result == null) {
+ return -1;
+ }
- indexWriter.maybeMerge();
-
- return seqID;
+ if (result.flushed) {
+ indexWriter.maybeMerge();
+ }
+ return result.sequenceID;
}
private final boolean finishAddDocument(DocumentsWriterPerThread perThread,
@@ -257,17 +283,12 @@ final class DocumentsWriter {
final boolean flushAllThreads(final boolean flushDeletes)
throws IOException {
- return threadPool.executeAllThreads(new DocumentsWriterThreadPool.AllThreadsTask<Boolean>() {
+
+ return threadPool.executeAllThreads(this, new DocumentsWriterThreadPool.AllThreadsTask<Boolean>() {
@Override
public Boolean process(Iterator<DocumentsWriterPerThread> threadsIterator) throws IOException {
boolean anythingFlushed = false;
- if (flushDeletes) {
- if (applyDeletes(indexWriter.segmentInfos)) {
- indexWriter.checkpoint();
- }
- }
-
while (threadsIterator.hasNext()) {
DocumentsWriterPerThread perThread = threadsIterator.next();
final int numDocs = perThread.getNumDocsInRAM();
@@ -282,6 +303,7 @@ final class DocumentsWriter {
if (flushDocs) {
SegmentInfo newSegment = perThread.flush();
+ newSegment.dir = indexWriter.getDirectory();
if (newSegment != null) {
anythingFlushed = true;
@@ -315,10 +337,8 @@ final class DocumentsWriter {
}
/** Build compound file for the segment we just flushed */
- void createCompoundFile(String segment, DocumentsWriterPerThread perThread) throws IOException {
-
- CompoundFileWriter cfsWriter = new CompoundFileWriter(directory,
- IndexFileNames.segmentFileName(segment, "", IndexFileNames.COMPOUND_FILE_EXTENSION));
+ void createCompoundFile(String compoundFileName, DocumentsWriterPerThread perThread) throws IOException {
+ CompoundFileWriter cfsWriter = new CompoundFileWriter(openFilesTrackingDirectory, compoundFileName);
for(String fileName : perThread.flushState.flushedFiles) {
cfsWriter.addFile(fileName);
}
@@ -327,49 +347,53 @@ final class DocumentsWriter {
cfsWriter.close();
}
- // nocommit
void finishFlushedSegment(SegmentInfo newSegment, DocumentsWriterPerThread perThread) throws IOException {
- synchronized(indexWriter) {
- indexWriter.segmentInfos.add(newSegment);
- indexWriter.checkpoint();
+ SegmentReader reader = indexWriter.readerPool.get(newSegment, false);
+ try {
+ applyDeletes(reader, newSegment.getMinSequenceID(), newSegment.getMaxSequenceID(), perThread.sequenceIDs);
+ } finally {
+ indexWriter.readerPool.release(reader);
+ }
- SegmentReader reader = indexWriter.readerPool.get(newSegment, false);
- boolean any = false;
+ if (indexWriter.useCompoundFile(newSegment)) {
+ String compoundFileName = IndexFileNames.segmentFileName(newSegment.name, "", IndexFileNames.COMPOUND_FILE_EXTENSION);
+ message("creating compound file " + compoundFileName);
+ // Now build compound file
+ boolean success = false;
try {
- any = applyDeletes(reader, newSegment.getMinSequenceID(), newSegment.getMaxSequenceID(), perThread.sequenceIDs);
+ createCompoundFile(compoundFileName, perThread);
+ success = true;
} finally {
- indexWriter.readerPool.release(reader);
- }
- if (any) {
- indexWriter.checkpoint();
- }
-
- if (indexWriter.mergePolicy.useCompoundFile(indexWriter.segmentInfos, newSegment)) {
- // Now build compound file
- boolean success = false;
- try {
- createCompoundFile(newSegment.name, perThread);
- success = true;
- } finally {
- if (!success) {
- if (infoStream != null) {
- message("hit exception " +
- "reating compound file for newly flushed segment " + newSegment.name);
- }
- indexWriter.deleter.deleteFile(IndexFileNames.segmentFileName(newSegment.name, "",
- IndexFileNames.COMPOUND_FILE_EXTENSION));
+ if (!success) {
+ if (infoStream != null) {
+ message("hit exception " +
+ "reating compound file for newly flushed segment " + newSegment.name);
}
+ indexWriter.deleter.deleteFile(IndexFileNames.segmentFileName(newSegment.name, "",
+ IndexFileNames.COMPOUND_FILE_EXTENSION));
+ for (String file : perThread.flushState.flushedFiles) {
+ indexWriter.deleter.deleteFile(file);
+ }
+
}
-
- synchronized(indexWriter) {
- newSegment.setUseCompoundFile(true);
- indexWriter.checkpoint();
- // In case the files we just merged into a CFS were
- // not previously checkpointed:
- indexWriter.deleter.deleteNewFiles(perThread.closedFiles());
- }
+ }
+
+ for (String file : perThread.flushState.flushedFiles) {
+ indexWriter.deleter.deleteFile(file);
+ }
+
+ newSegment.setUseCompoundFile(true);
+
+ synchronized(openFiles) {
+ openFiles.remove(compoundFileName);
}
}
+
+ synchronized(openFiles) {
+ openFiles.removeAll(perThread.flushState.flushedFiles);
+ }
+
+ indexWriter.addNewSegment(newSegment);
}
// Returns true if an abort is in progress
@@ -400,6 +424,7 @@ final class DocumentsWriter {
if (perThread.getNumDocsInRAM() == maxBufferedDocs) {
flushSegment(perThread);
assert perThread.getNumDocsInRAM() == 0;
+
return true;
}
@@ -413,48 +438,57 @@ final class DocumentsWriter {
}
SegmentInfo newSegment = perThread.flush();
+ newSegment.dir = indexWriter.getDirectory();
- if (newSegment != null) {
- finishFlushedSegment(newSegment, perThread);
- return true;
- }
- return false;
+ finishFlushedSegment(newSegment, perThread);
+ return true;
}
void abort() throws IOException {
- threadPool.abort();
- try {
- try {
- abortedFiles = openFiles();
- } catch (Throwable t) {
- abortedFiles = null;
+ threadPool.abort(new DocumentsWriterThreadPool.AbortTask() {
+
+ @Override
+ void abort() throws IOException {
+ try {
+ abortedFiles = openFiles();
+ } catch (Throwable t) {
+ abortedFiles = null;
+ }
+
+ deletesInRAM.clear();
+ // nocommit
+ // deletesFlushed.clear();
+
+ openFiles.clear();
+ deletesInRAM.clear();
}
-
- deletesInRAM.clear();
- // nocommit
- // deletesFlushed.clear();
-
- openFiles.clear();
- } finally {
- threadPool.finishAbort();
- }
-
+ });
}
- final List<String> openFiles = new ArrayList<String>();
+ final Set<String> openFiles = new HashSet<String>();
private Collection<String> abortedFiles; // List of files that were written before last abort()
-
/*
* Returns Collection of files in use by this instance,
* including any flushed segments.
*/
@SuppressWarnings("unchecked")
- List<String> openFiles() {
+ private Collection<String> openFiles() {
synchronized(openFiles) {
- return (List<String>) ((ArrayList<String>) openFiles).clone();
+ return (Set<String>) ((HashSet<String>) openFiles).clone();
}
}
+ void addOpenFile(String file) {
+ synchronized(openFiles) {
+ openFiles.add(file);
+ }
+ }
+
+ void removeOpenFile(String file) {
+ synchronized(openFiles) {
+ openFiles.remove(file);
+ }
+ }
Collection<String> abortedFiles() {
return abortedFiles;
@@ -480,27 +514,29 @@ final class DocumentsWriter {
// }
boolean applyDeletes(SegmentInfos infos) throws IOException {
- synchronized(indexWriter) {
- if (!hasDeletes())
- return false;
-
- final long t0 = System.currentTimeMillis();
-
- if (infoStream != null) {
- message("apply " + deletesInRAM.getNumDeletes() + " buffered deletes on " +
- +infos.size() + " segments.");
- }
-
- final int infosEnd = infos.size();
-
- boolean any = false;
- for (int i = 0; i < infosEnd; i++) {
-
- // Make sure we never attempt to apply deletes to
- // segment in external dir
- assert infos.info(i).dir == directory;
-
- SegmentInfo si = infos.info(i);
+ if (!hasDeletes())
+ return false;
+
+ final long t0 = System.currentTimeMillis();
+
+ if (infoStream != null) {
+ message("apply " + deletesInRAM.getNumDeletes() + " buffered deletes on " +
+ +infos.size() + " segments.");
+ }
+
+ final int infosEnd = infos.size();
+
+ boolean any = false;
+ for (int i = 0; i < infosEnd; i++) {
+
+ // Make sure we never attempt to apply deletes to
+ // segment in external dir
+ assert infos.info(i).dir == indexWriter.getDirectory();
+
+ SegmentInfo si = infos.info(i);
+ // we have to synchronize here, because we need a write lock on
+ // the segment in order to apply deletes
+ synchronized (indexWriter) {
SegmentReader reader = indexWriter.readerPool.get(si, false);
try {
any |= applyDeletes(reader, si.getMinSequenceID(), si.getMaxSequenceID(), null);
@@ -508,13 +544,13 @@ final class DocumentsWriter {
indexWriter.readerPool.release(reader);
}
}
-
- if (infoStream != null) {
- message("apply deletes took " + (System.currentTimeMillis() - t0) + " msec");
- }
-
- return any;
}
+
+ if (infoStream != null) {
+ message("apply deletes took " + (System.currentTimeMillis() - t0) + " msec");
+ }
+
+ return any;
}
// Apply buffered delete terms, queries and docIDs to the
@@ -642,9 +678,6 @@ final class DocumentsWriter {
}
void message(String message) {
- if (infoStream != null) {
- indexWriter.message("DW: " + message);
- }
+ indexWriter.message("DW: " + message);
}
-
}
Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java?rev=979856&r1=979855&r2=979856&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java Tue Jul 27 20:33:05 2010
@@ -1,15 +1,32 @@
package org.apache.lucene.index;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
import java.io.IOException;
import java.io.PrintStream;
-import java.util.ArrayList;
-import java.util.List;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.codecs.Codec;
import org.apache.lucene.search.Similarity;
import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FilterDirectory;
+import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.util.ArrayUtil;
public class DocumentsWriterPerThread {
@@ -84,6 +101,7 @@ public class DocumentsWriterPerThread {
* currently buffered docs. This resets our state,
* discarding any docs added since last flush. */
void abort() throws IOException {
+ assert aborting;
try {
if (infoStream != null) {
message("docWriter: now abort");
@@ -124,11 +142,11 @@ public class DocumentsWriterPerThread {
SegmentWriteState flushState;
long[] sequenceIDs = new long[8];
- final List<String> closedFiles = new ArrayList<String>();
long numBytesUsed;
public DocumentsWriterPerThread(Directory directory, DocumentsWriter parent, IndexingChain indexingChain) {
+ parent.indexWriter.testPoint("DocumentsWriterPerThread.init start");
this.directory = directory;
this.parent = parent;
this.writer = parent.indexWriter;
@@ -141,7 +159,6 @@ public class DocumentsWriterPerThread {
if (consumer instanceof DocFieldProcessor) {
docFieldProcessor = (DocFieldProcessor) consumer;
}
-
}
void setAborting() {
@@ -288,33 +305,11 @@ public class DocumentsWriterPerThread {
return segment;
}
- @SuppressWarnings("unchecked")
- List<String> closedFiles() {
- return (List<String>) ((ArrayList<String>) closedFiles).clone();
- }
-
-
- void addOpenFile(String name) {
- synchronized(parent.openFiles) {
- assert !parent.openFiles.contains(name);
- parent.openFiles.add(name);
- }
- }
-
- void removeOpenFile(String name) {
- synchronized(parent.openFiles) {
- assert parent.openFiles.contains(name);
- parent.openFiles.remove(name);
- }
- closedFiles.add(name);
- }
-
void bytesUsed(long numBytes) {
ramAllocator.bytesUsed(numBytes);
}
void message(String message) {
- if (infoStream != null)
- writer.message("DW: " + message);
+ writer.message("DW: " + message);
}
}
Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterThreadPool.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterThreadPool.java?rev=979856&r1=979855&r2=979856&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterThreadPool.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/DocumentsWriterThreadPool.java Tue Jul 27 20:33:05 2010
@@ -1,5 +1,22 @@
package org.apache.lucene.index;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
import java.io.IOException;
import java.util.Iterator;
import java.util.concurrent.locks.Condition;
@@ -30,6 +47,10 @@ abstract class DocumentsWriterThreadPool
abstract T process(final Iterator<DocumentsWriterPerThread> threadsIterator) throws IOException;
}
+ public static abstract class AbortTask {
+ abstract void abort() throws IOException;
+ }
+
protected abstract static class ThreadState {
private DocumentsWriterPerThread perThread;
private boolean isIdle = true;
@@ -95,11 +116,29 @@ abstract class DocumentsWriterThreadPool
return true;
}
- void abort() throws IOException {
- pauseAllThreads();
- aborting = true;
- for (ThreadState state : allThreadStates) {
- state.perThread.abort();
+ void abort(AbortTask task) throws IOException {
+ lock.lock();
+ try {
+ if (!aborting) {
+ aborting = true;
+ pauseAllThreads();
+ for (ThreadState state : allThreadStates) {
+ state.perThread.aborting = true;
+ }
+
+ try {
+ for (ThreadState state : allThreadStates) {
+ state.perThread.abort();
+ }
+
+ task.abort();
+ } finally {
+ aborting = false;
+ resumeAllThreads();
+ }
+ }
+ } finally {
+ lock.unlock();
}
}
@@ -108,7 +147,7 @@ abstract class DocumentsWriterThreadPool
resumeAllThreads();
}
- public <T> T executeAllThreads(AllThreadsTask<T> task) throws IOException {
+ public <T> T executeAllThreads(DocumentsWriter documentsWriter, AllThreadsTask<T> task) throws IOException {
T result = null;
lock.lock();
@@ -120,19 +159,21 @@ abstract class DocumentsWriterThreadPool
} catch (InterruptedException ie) {
throw new ThreadInterruptedException(ie);
}
-
- pauseAllThreads();
+
+ assert !globalLock;
globalLock = true;
+
+ pauseAllThreads();
+
} finally {
lock.unlock();
}
+ final ThreadState[] localAllThreads = allThreadStates;
// all threads are idle now
-
+ boolean success = false;
try {
- final ThreadState[] localAllThreads = allThreadStates;
-
result = task.process(new Iterator<DocumentsWriterPerThread>() {
int i = 0;
@@ -151,8 +192,18 @@ abstract class DocumentsWriterThreadPool
throw new UnsupportedOperationException("remove() not supported.");
}
});
+ success = true;
return result;
} finally {
+ boolean abort = false;
+ if (!success) {
+ for (ThreadState state : localAllThreads) {
+ if (state.perThread.aborting) {
+ abort = true;
+ }
+ }
+ }
+
lock.lock();
try {
try {
@@ -168,6 +219,10 @@ abstract class DocumentsWriterThreadPool
lock.unlock();
}
+ if (!aborting && abort) {
+ documentsWriter.abort();
+ }
+
}
}
@@ -182,13 +237,12 @@ abstract class DocumentsWriterThreadPool
} finally {
boolean abort = false;
if (!success && state.perThread.aborting) {
- state.perThread.aborting = false;
abort = true;
}
returnDocumentsWriterPerThread(state, task.doClearThreadBindings());
- if (abort) {
+ if (!aborting && abort) {
documentsWriter.abort();
}
}
@@ -222,13 +276,15 @@ abstract class DocumentsWriterThreadPool
ThreadState threadState = selectThreadState(Thread.currentThread(), documentsWriter, doc);
try {
- while (!threadState.isIdle || globalLock || aborting) {
+ while (!threadState.isIdle || globalLock || aborting || threadState.perThread.aborting) {
threadStateAvailable.await();
}
} catch (InterruptedException ie) {
throw new ThreadInterruptedException(ie);
}
+ assert threadState.isIdle;
+
threadState.isIdle = false;
threadState.start();
Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexFileDeleter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexFileDeleter.java?rev=979856&r1=979855&r2=979856&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexFileDeleter.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexFileDeleter.java Tue Jul 27 20:33:05 2010
@@ -25,6 +25,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.Set;
import java.util.List;
import java.util.Map;
@@ -422,7 +423,7 @@ final class IndexFileDeleter {
public void checkpoint(SegmentInfos segmentInfos, boolean isCommit) throws IOException {
if (infoStream != null) {
- message("now checkpoint \"" + segmentInfos.getCurrentSegmentFileName() + "\" [" + segmentInfos.size() + " segments " + "; isCommit = " + isCommit + "]");
+ message("now checkpoint \"" + segmentInfos + "\" [" + segmentInfos.size() + " segments " + "; isCommit = " + isCommit + "]");
}
// Try again now to delete any previously un-deletable
@@ -442,18 +443,6 @@ final class IndexFileDeleter {
// Decref files for commits that were deleted by the policy:
deleteCommits();
} else {
-
- final List<String> docWriterFiles;
- if (docWriter != null) {
- docWriterFiles = docWriter.openFiles();
- if (docWriterFiles != null)
- // We must incRef these files before decRef'ing
- // last files to make sure we don't accidentally
- // delete them:
- incRef(docWriterFiles);
- } else
- docWriterFiles = null;
-
// DecRef old files from the last checkpoint, if any:
int size = lastFiles.size();
if (size > 0) {
@@ -465,8 +454,6 @@ final class IndexFileDeleter {
// Save files so we can decr on next checkpoint/commit:
lastFiles.add(segmentInfos.files(directory, false));
- if (docWriterFiles != null)
- lastFiles.add(docWriterFiles);
}
}
Added: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexReaderPool.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexReaderPool.java?rev=979856&view=auto
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexReaderPool.java (added)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexReaderPool.java Tue Jul 27 20:33:05 2010
@@ -0,0 +1,263 @@
+package org.apache.lucene.index;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.lucene.store.BufferedIndexInput;
+import org.apache.lucene.store.Directory;
+
+/** Holds shared SegmentReader instances. IndexWriter uses
+ * SegmentReaders for 1) applying deletes, 2) doing
+ * merges, 3) handing out a real-time reader. This pool
+ * reuses instances of the SegmentReaders in all these
+ * places if it is in "near real-time mode" (getReader()
+ * has been called on this instance). */
+public class IndexReaderPool {
+
+ private final Map<SegmentInfo,SegmentReader> readerMap = new HashMap<SegmentInfo,SegmentReader>();
+
+ private final Directory directory;
+ private final IndexWriterConfig config;
+ private final IndexWriter writer;
+
+ public IndexReaderPool(IndexWriter writer, Directory directory, IndexWriterConfig config) {
+ this.directory = directory;
+ this.config = config;
+ this.writer = writer;
+ }
+
+ /** Forcefully clear changes for the specified segments,
+ * and remove from the pool. This is called on successful merge. */
+ synchronized void clear(SegmentInfos infos) throws IOException {
+ if (infos == null) {
+ for (Map.Entry<SegmentInfo,SegmentReader> ent: readerMap.entrySet()) {
+ ent.getValue().hasChanges = false;
+ }
+ } else {
+ for (final SegmentInfo info: infos) {
+ if (readerMap.containsKey(info)) {
+ readerMap.get(info).hasChanges = false;
+ }
+ }
+ }
+ }
+
+ /**
+ * Release the segment reader (i.e. decRef it and close if there
+ * are no more references.
+ * @param sr
+ * @throws IOException
+ */
+ public synchronized void release(SegmentReader sr) throws IOException {
+ release(sr, false);
+ }
+
+ /**
+ * Release the segment reader (i.e. decRef it and close if there
+ * are no more references.
+ * @param sr
+ * @throws IOException
+ */
+ public synchronized void release(SegmentReader sr, boolean drop) throws IOException {
+
+ final boolean pooled = readerMap.containsKey(sr.getSegmentInfo());
+
+ assert !pooled | readerMap.get(sr.getSegmentInfo()) == sr;
+
+ // Drop caller's ref; for an external reader (not
+ // pooled), this decRef will close it
+ sr.decRef();
+
+ if (pooled && (drop || (!writer.poolReaders && sr.getRefCount() == 1))) {
+
+ // We are the last ref to this reader; since we're
+ // not pooling readers, we release it:
+ readerMap.remove(sr.getSegmentInfo());
+
+ // nocommit
+ //assert !sr.hasChanges || Thread.holdsLock(IndexWriter.this);
+
+ // Drop our ref -- this will commit any pending
+ // changes to the dir
+ boolean success = false;
+ try {
+ sr.close();
+ success = true;
+ } finally {
+ if (!success && sr.hasChanges) {
+ // Abandon the changes & retry closing:
+ sr.hasChanges = false;
+ try {
+ sr.close();
+ } catch (Throwable ignore) {
+ // Keep throwing original exception
+ }
+ }
+ }
+ }
+ }
+
+ /** Remove all our references to readers, and commits
+ * any pending changes. */
+ synchronized void close() throws IOException {
+ Iterator<Map.Entry<SegmentInfo,SegmentReader>> iter = readerMap.entrySet().iterator();
+ while (iter.hasNext()) {
+
+ Map.Entry<SegmentInfo,SegmentReader> ent = iter.next();
+
+ SegmentReader sr = ent.getValue();
+ if (sr.hasChanges) {
+ assert writer.infoIsLive(sr.getSegmentInfo());
+ sr.startCommit();
+ boolean success = false;
+ try {
+ sr.doCommit(null);
+ success = true;
+ } finally {
+ if (!success) {
+ sr.rollbackCommit();
+ }
+ }
+ }
+
+ iter.remove();
+
+ // NOTE: it is allowed that this decRef does not
+ // actually close the SR; this can happen when a
+ // near real-time reader is kept open after the
+ // IndexWriter instance is closed
+ sr.decRef();
+ }
+ }
+
+ /**
+ * Commit all segment reader in the pool.
+ * @throws IOException
+ */
+ synchronized void commit() throws IOException {
+ for (Map.Entry<SegmentInfo,SegmentReader> ent : readerMap.entrySet()) {
+
+ SegmentReader sr = ent.getValue();
+ if (sr.hasChanges) {
+ assert writer.infoIsLive(sr.getSegmentInfo());
+ sr.startCommit();
+ boolean success = false;
+ try {
+ sr.doCommit(null);
+ success = true;
+ } finally {
+ if (!success) {
+ sr.rollbackCommit();
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Returns a ref to a clone. NOTE: this clone is not
+ * enrolled in the pool, so you should simply close()
+ * it when you're done (ie, do not call release()).
+ */
+ public synchronized SegmentReader getReadOnlyClone(SegmentInfo info, boolean doOpenStores, int termInfosIndexDivisor) throws IOException {
+ SegmentReader sr = get(info, doOpenStores, BufferedIndexInput.BUFFER_SIZE, termInfosIndexDivisor);
+ try {
+ return (SegmentReader) sr.clone(true);
+ } finally {
+ sr.decRef();
+ }
+ }
+
+ /**
+ * Obtain a SegmentReader from the readerPool. The reader
+ * must be returned by calling {@link #release(SegmentReader)}
+ * @see #release(SegmentReader)
+ * @param info
+ * @param doOpenStores
+ * @throws IOException
+ */
+ public synchronized SegmentReader get(SegmentInfo info, boolean doOpenStores) throws IOException {
+ return get(info, doOpenStores, BufferedIndexInput.BUFFER_SIZE, config.getReaderTermsIndexDivisor());
+ }
+
+ /**
+ * Obtain a SegmentReader from the readerPool. The reader
+ * must be returned by calling {@link #release(SegmentReader)}
+ *
+ * @see #release(SegmentReader)
+ * @param info
+ * @param doOpenStores
+ * @param readBufferSize
+ * @param termsIndexDivisor
+ * @throws IOException
+ */
+ public synchronized SegmentReader get(SegmentInfo info, boolean doOpenStores, int readBufferSize, int termsIndexDivisor) throws IOException {
+
+ if (writer.poolReaders) {
+ readBufferSize = BufferedIndexInput.BUFFER_SIZE;
+ }
+
+ SegmentReader sr = readerMap.get(info);
+ if (sr == null) {
+ // TODO: we may want to avoid doing this while
+ // synchronized
+ // Returns a ref, which we xfer to readerMap:
+ sr = SegmentReader.get(false, info.dir, info, readBufferSize, doOpenStores, termsIndexDivisor, config.getCodecProvider());
+
+ if (info.dir == directory) {
+ // Only pool if reader is not external
+ readerMap.put(info, sr);
+ }
+ } else {
+ if (doOpenStores) {
+ sr.openDocStores();
+ }
+ if (termsIndexDivisor != -1) {
+ // If this reader was originally opened because we
+ // needed to merge it, we didn't load the terms
+ // index. But now, if the caller wants the terms
+ // index (eg because it's doing deletes, or an NRT
+ // reader is being opened) we ask the reader to
+ // load its terms index.
+ sr.loadTermsIndex(termsIndexDivisor);
+ }
+ }
+
+ // Return a ref to our caller
+ if (info.dir == directory) {
+ // Only incRef if we pooled (reader is not external)
+ sr.incRef();
+ }
+ return sr;
+ }
+
+ // Returns a ref
+ public synchronized SegmentReader getIfExists(SegmentInfo info) throws IOException {
+ SegmentReader sr = readerMap.get(info);
+ if (sr != null) {
+ sr.incRef();
+ }
+ return sr;
+ }
+}
+
+
Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriter.java?rev=979856&r1=979855&r2=979856&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriter.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/IndexWriter.java Tue Jul 27 20:33:05 2010
@@ -271,8 +271,7 @@ public class IndexWriter implements Clos
volatile SegmentInfos pendingCommit; // set when a commit is pending (after prepareCommit() & before commit())
volatile long pendingCommitChangeCount;
- // nocommit - private
- SegmentInfos segmentInfos = new SegmentInfos(); // the segments
+ private final SegmentInfos segmentInfos = new SegmentInfos(); // the segments
private DocumentsWriter docWriter;
//nocommit - private
@@ -304,7 +303,7 @@ public class IndexWriter implements Clos
private int flushCount;
private int flushDeletesCount;
- final ReaderPool readerPool = new ReaderPool();
+ final IndexReaderPool readerPool;
// This is a "write once" variable (like the organic dye
// on a DVD-R that may or may not be heated by a laser and
@@ -315,7 +314,7 @@ public class IndexWriter implements Clos
// reuse SegmentReader instances internally for applying
// deletes, doing merges, and reopening near real-time
// readers.
- private volatile boolean poolReaders;
+ volatile boolean poolReaders;
// The instance that was passed to the constructor. It is saved only in order
// to allow users to query an IndexWriter settings.
@@ -427,246 +426,21 @@ public class IndexWriter implements Clos
return r;
}
}
-
- /** Holds shared SegmentReader instances. IndexWriter uses
- * SegmentReaders for 1) applying deletes, 2) doing
- * merges, 3) handing out a real-time reader. This pool
- * reuses instances of the SegmentReaders in all these
- * places if it is in "near real-time mode" (getReader()
- * has been called on this instance). */
-
- class ReaderPool {
-
- private final Map<SegmentInfo,SegmentReader> readerMap = new HashMap<SegmentInfo,SegmentReader>();
-
- /** Forcefully clear changes for the specified segments,
- * and remove from the pool. This is called on successful merge. */
- synchronized void clear(SegmentInfos infos) throws IOException {
- if (infos == null) {
- for (Map.Entry<SegmentInfo,SegmentReader> ent: readerMap.entrySet()) {
- ent.getValue().hasChanges = false;
- }
- } else {
- for (final SegmentInfo info: infos) {
- if (readerMap.containsKey(info)) {
- readerMap.get(info).hasChanges = false;
- }
- }
- }
- }
-
- // used only by asserts
- public synchronized boolean infoIsLive(SegmentInfo info) {
- int idx = segmentInfos.indexOf(info);
- assert idx != -1;
- assert segmentInfos.get(idx) == info;
- return true;
- }
-
- public synchronized SegmentInfo mapToLive(SegmentInfo info) {
- int idx = segmentInfos.indexOf(info);
- if (idx != -1) {
- info = segmentInfos.get(idx);
- }
- return info;
- }
-
- /**
- * Release the segment reader (i.e. decRef it and close if there
- * are no more references.
- * @param sr
- * @throws IOException
- */
- public synchronized void release(SegmentReader sr) throws IOException {
- release(sr, false);
- }
-
- /**
- * Release the segment reader (i.e. decRef it and close if there
- * are no more references.
- * @param sr
- * @throws IOException
- */
- public synchronized void release(SegmentReader sr, boolean drop) throws IOException {
-
- final boolean pooled = readerMap.containsKey(sr.getSegmentInfo());
-
- assert !pooled | readerMap.get(sr.getSegmentInfo()) == sr;
-
- // Drop caller's ref; for an external reader (not
- // pooled), this decRef will close it
- sr.decRef();
-
- if (pooled && (drop || (!poolReaders && sr.getRefCount() == 1))) {
-
- // We are the last ref to this reader; since we're
- // not pooling readers, we release it:
- readerMap.remove(sr.getSegmentInfo());
-
- assert !sr.hasChanges || Thread.holdsLock(IndexWriter.this);
-
- // Drop our ref -- this will commit any pending
- // changes to the dir
- boolean success = false;
- try {
- sr.close();
- success = true;
- } finally {
- if (!success && sr.hasChanges) {
- // Abandon the changes & retry closing:
- sr.hasChanges = false;
- try {
- sr.close();
- } catch (Throwable ignore) {
- // Keep throwing original exception
- }
- }
- }
- }
- }
-
- /** Remove all our references to readers, and commits
- * any pending changes. */
- synchronized void close() throws IOException {
- Iterator<Map.Entry<SegmentInfo,SegmentReader>> iter = readerMap.entrySet().iterator();
- while (iter.hasNext()) {
-
- Map.Entry<SegmentInfo,SegmentReader> ent = iter.next();
-
- SegmentReader sr = ent.getValue();
- if (sr.hasChanges) {
- assert infoIsLive(sr.getSegmentInfo());
- sr.startCommit();
- boolean success = false;
- try {
- sr.doCommit(null);
- success = true;
- } finally {
- if (!success) {
- sr.rollbackCommit();
- }
- }
- }
-
- iter.remove();
-
- // NOTE: it is allowed that this decRef does not
- // actually close the SR; this can happen when a
- // near real-time reader is kept open after the
- // IndexWriter instance is closed
- sr.decRef();
- }
- }
-
- /**
- * Commit all segment reader in the pool.
- * @throws IOException
- */
- synchronized void commit() throws IOException {
- for (Map.Entry<SegmentInfo,SegmentReader> ent : readerMap.entrySet()) {
-
- SegmentReader sr = ent.getValue();
- if (sr.hasChanges) {
- assert infoIsLive(sr.getSegmentInfo());
- sr.startCommit();
- boolean success = false;
- try {
- sr.doCommit(null);
- success = true;
- } finally {
- if (!success) {
- sr.rollbackCommit();
- }
- }
- }
- }
- }
-
- /**
- * Returns a ref to a clone. NOTE: this clone is not
- * enrolled in the pool, so you should simply close()
- * it when you're done (ie, do not call release()).
- */
- public synchronized SegmentReader getReadOnlyClone(SegmentInfo info, boolean doOpenStores, int termInfosIndexDivisor) throws IOException {
- SegmentReader sr = get(info, doOpenStores, BufferedIndexInput.BUFFER_SIZE, termInfosIndexDivisor);
- try {
- return (SegmentReader) sr.clone(true);
- } finally {
- sr.decRef();
- }
- }
-
- /**
- * Obtain a SegmentReader from the readerPool. The reader
- * must be returned by calling {@link #release(SegmentReader)}
- * @see #release(SegmentReader)
- * @param info
- * @param doOpenStores
- * @throws IOException
- */
- public synchronized SegmentReader get(SegmentInfo info, boolean doOpenStores) throws IOException {
- return get(info, doOpenStores, BufferedIndexInput.BUFFER_SIZE, config.getReaderTermsIndexDivisor());
- }
-
- /**
- * Obtain a SegmentReader from the readerPool. The reader
- * must be returned by calling {@link #release(SegmentReader)}
- *
- * @see #release(SegmentReader)
- * @param info
- * @param doOpenStores
- * @param readBufferSize
- * @param termsIndexDivisor
- * @throws IOException
- */
- public synchronized SegmentReader get(SegmentInfo info, boolean doOpenStores, int readBufferSize, int termsIndexDivisor) throws IOException {
-
- if (poolReaders) {
- readBufferSize = BufferedIndexInput.BUFFER_SIZE;
- }
-
- SegmentReader sr = readerMap.get(info);
- if (sr == null) {
- // TODO: we may want to avoid doing this while
- // synchronized
- // Returns a ref, which we xfer to readerMap:
- sr = SegmentReader.get(false, info.dir, info, readBufferSize, doOpenStores, termsIndexDivisor, codecs);
-
- if (info.dir == directory) {
- // Only pool if reader is not external
- readerMap.put(info, sr);
- }
- } else {
- if (doOpenStores) {
- sr.openDocStores();
- }
- if (termsIndexDivisor != -1) {
- // If this reader was originally opened because we
- // needed to merge it, we didn't load the terms
- // index. But now, if the caller wants the terms
- // index (eg because it's doing deletes, or an NRT
- // reader is being opened) we ask the reader to
- // load its terms index.
- sr.loadTermsIndex(termsIndexDivisor);
- }
- }
-
- // Return a ref to our caller
- if (info.dir == directory) {
- // Only incRef if we pooled (reader is not external)
- sr.incRef();
- }
- return sr;
- }
-
- // Returns a ref
- public synchronized SegmentReader getIfExists(SegmentInfo info) throws IOException {
- SegmentReader sr = readerMap.get(info);
- if (sr != null) {
- sr.incRef();
- }
- return sr;
+
+ // used only by asserts
+ public synchronized boolean infoIsLive(SegmentInfo info) {
+ int idx = segmentInfos.indexOf(info);
+ assert idx != -1;
+ assert segmentInfos.get(idx) == info;
+ return true;
+ }
+
+ public synchronized SegmentInfo mapToLive(SegmentInfo info) {
+ int idx = segmentInfos.indexOf(info);
+ if (idx != -1) {
+ info = segmentInfos.get(idx);
}
+ return info;
}
/**
@@ -934,6 +708,8 @@ public class IndexWriter implements Clos
poolReaders = conf.getReaderPooling();
+ this.readerPool = new IndexReaderPool(this, directory, config);
+
OpenMode mode = conf.getOpenMode();
boolean create;
if (mode == OpenMode.CREATE) {
@@ -1784,7 +1560,7 @@ public class IndexWriter implements Clos
if (infoStream != null) {
message("hit exception updating document");
}
-
+
synchronized (this) {
// If docWriter has some aborted files that were
// never incref'd, then we clean them up here
@@ -2419,6 +2195,15 @@ public class IndexWriter implements Clos
deleter.checkpoint(segmentInfos, false);
}
+ synchronized void addNewSegment(SegmentInfo newSegment) throws IOException {
+ segmentInfos.add(newSegment);
+ checkpoint();
+ }
+
+ boolean useCompoundFile(SegmentInfo segmentInfo) {
+ return mergePolicy.useCompoundFile(segmentInfos, segmentInfo);
+ }
+
private synchronized void resetMergeExceptions() {
mergeExceptions = new ArrayList<MergePolicy.OneMerge>();
mergeGen++;
@@ -2793,7 +2578,7 @@ public class IndexWriter implements Clos
if (pendingCommit != null) {
try {
if (infoStream != null)
- message("commit: pendingCommit != null");
+ message("commit: pendingCommit != null");
pendingCommit.finishCommit(directory);
if (infoStream != null)
message("commit: wrote segments file \"" + pendingCommit.getCurrentSegmentFileName() + "\"");
@@ -2828,25 +2613,37 @@ public class IndexWriter implements Clos
protected final void flush(boolean triggerMerge, boolean flushDeletes) throws CorruptIndexException, IOException {
// We can be called during close, when closing==true, so we must pass false to ensureOpen:
ensureOpen(false);
- if (doFlush(flushDeletes) && triggerMerge)
+
+ doBeforeFlush();
+
+ if (flushDeletes) {
+ if (applyDeletes()) {
+ checkpoint();
+ }
+ }
+ boolean maybeMerge = false;
+ boolean success = false;
+ try {
+ maybeMerge = docWriter.flushAllThreads(flushDeletes) && triggerMerge;
+ success = true;
+ } finally {
+ if (!success) {
+ synchronized (this) {
+ // If docWriter has some aborted files that were
+ // never incref'd, then we clean them up here
+ final Collection<String> files = docWriter.abortedFiles();
+ if (files != null) {
+ deleter.deleteNewFiles(files);
+ }
+ }
+ }
+ }
+
+ doAfterFlush();
+
+ if (maybeMerge) {
maybeMerge();
- }
-
- // TODO: this method should not have to be entirely
- // synchronized, ie, merges should be allowed to commit
- // even while a flush is happening
- private synchronized final boolean doFlush(boolean flushDeletes) throws CorruptIndexException, IOException {
- return docWriter.flushAllThreads(flushDeletes);
- // nocommit
-// try {
-// try {
-// return doFlushInternal(flushDocStores, flushDeletes);
-// } finally {
-// docWriter.balanceRAM();
-// }
-// } finally {
-// docWriter.clearFlushPending();
-// }
+ }
}
/** Expert: Return the total size of all index files currently cached in memory.
Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/StoredFieldsWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/StoredFieldsWriter.java?rev=979856&r1=979855&r2=979856&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/StoredFieldsWriter.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/StoredFieldsWriter.java Tue Jul 27 20:33:05 2010
@@ -79,9 +79,6 @@ final class StoredFieldsWriter {
state.flushedFiles.add(fieldsName);
state.flushedFiles.add(fieldsIdxName);
- docWriter.removeOpenFile(fieldsName);
- docWriter.removeOpenFile(fieldsIdxName);
-
if (4+((long) state.numDocs)*8 != state.directory.fileLength(fieldsIdxName)) {
throw new RuntimeException("after flush: fdx size mismatch: " + state.numDocs + " docs vs " + state.directory.fileLength(fieldsIdxName) + " length in bytes of " + fieldsIdxName + " file exists?=" + state.directory.fileExists(fieldsIdxName));
}
@@ -96,8 +93,6 @@ final class StoredFieldsWriter {
fieldsWriter = new FieldsWriter(docWriter.directory,
segment,
fieldInfos);
- docWriter.addOpenFile(IndexFileNames.segmentFileName(segment, "", IndexFileNames.FIELDS_EXTENSION));
- docWriter.addOpenFile(IndexFileNames.segmentFileName(segment, "", IndexFileNames.FIELDS_INDEX_EXTENSION));
lastDocID = 0;
}
}
Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriter.java?rev=979856&r1=979855&r2=979856&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriter.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermVectorsTermsWriter.java Tue Jul 27 20:33:05 2010
@@ -75,10 +75,6 @@ final class TermVectorsTermsWriter exten
state.flushedFiles.add(fldName);
state.flushedFiles.add(docName);
- docWriter.removeOpenFile(idxName);
- docWriter.removeOpenFile(fldName);
- docWriter.removeOpenFile(docName);
-
lastDocID = 0;
}
@@ -105,7 +101,7 @@ final class TermVectorsTermsWriter exten
}
}
- void initTermVectorsWriter() throws IOException {
+ private final void initTermVectorsWriter() throws IOException {
if (tvx == null) {
final String segment = docWriter.getSegment();
@@ -128,10 +124,6 @@ final class TermVectorsTermsWriter exten
tvd.writeInt(TermVectorsReader.FORMAT_CURRENT);
tvf.writeInt(TermVectorsReader.FORMAT_CURRENT);
- docWriter.addOpenFile(idxName);
- docWriter.addOpenFile(fldName);
- docWriter.addOpenFile(docName);
-
lastDocID = 0;
}
}
@@ -146,7 +138,8 @@ final class TermVectorsTermsWriter exten
fill(docState.docID);
// Append term vectors to the real outputs:
- tvx.writeLong(tvd.getFilePointer());
+ long pointer = tvd.getFilePointer();
+ tvx.writeLong(pointer);
tvx.writeLong(tvf.getFilePointer());
tvd.writeVInt(numVectorFields);
if (numVectorFields > 0) {
@@ -198,7 +191,7 @@ final class TermVectorsTermsWriter exten
}
lastDocID = 0;
-
+ reset();
}
int numVectorFields;
Modified: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermsHash.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermsHash.java?rev=979856&r1=979855&r2=979856&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermsHash.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/index/TermsHash.java Tue Jul 27 20:33:05 2010
@@ -82,9 +82,12 @@ final class TermsHash extends InvertedDo
@Override
public void abort() {
reset();
- consumer.abort();
- if (nextTermsHash != null) {
- nextTermsHash.abort();
+ try {
+ consumer.abort();
+ } finally {
+ if (nextTermsHash != null) {
+ nextTermsHash.abort();
+ }
}
}
Added: lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/store/FilterDirectory.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/store/FilterDirectory.java?rev=979856&view=auto
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/store/FilterDirectory.java (added)
+++ lucene/dev/branches/realtime_search/lucene/src/java/org/apache/lucene/store/FilterDirectory.java Tue Jul 27 20:33:05 2010
@@ -0,0 +1,111 @@
+package org.apache.lucene.store;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.util.Collection;
+
+public abstract class FilterDirectory extends Directory {
+ private final Directory delegate;
+
+ public FilterDirectory(Directory delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public String[] listAll() throws IOException {
+ return delegate.listAll();
+ }
+
+ @Override
+ public boolean fileExists(String name) throws IOException {
+ return delegate.fileExists(name);
+ }
+
+ @Override
+ public long fileModified(String name) throws IOException {
+ return delegate.fileModified(name);
+ }
+
+ @Override
+ public void touchFile(String name) throws IOException {
+ delegate.touchFile(name);
+ }
+
+ @Override
+ public void deleteFile(String name) throws IOException {
+ delegate.deleteFile(name);
+ }
+
+ @Override
+ public long fileLength(String name) throws IOException {
+ return delegate.fileLength(name);
+ }
+
+ @Override
+ public IndexOutput createOutput(String name) throws IOException {
+ return delegate.createOutput(name);
+ }
+
+ @Override
+ public IndexInput openInput(String name) throws IOException {
+ return delegate.openInput(name);
+ }
+
+ @Override
+ public void close() throws IOException {
+ delegate.close();
+ }
+
+ @Deprecated @Override
+ public void sync(String name) throws IOException { // TODO 4.0 kill me
+ delegate.sync(name);
+ }
+
+ public void sync(Collection<String> names) throws IOException { // TODO 4.0 make me abstract
+ delegate.sync(names);
+ }
+
+ public IndexInput openInput(String name, int bufferSize) throws IOException {
+ return delegate.openInput(name, bufferSize);
+ }
+
+ public Lock makeLock(String name) {
+ return delegate.makeLock(name);
+ }
+
+ public void clearLock(String name) throws IOException {
+ delegate.clearLock(name);
+ }
+
+ public void setLockFactory(LockFactory lockFactory) {
+ delegate.setLockFactory(lockFactory);
+ }
+
+ public LockFactory getLockFactory() {
+ return delegate.getLockFactory();
+ }
+
+ public String getLockID() {
+ return delegate.getLockID();
+ }
+
+ public void copy(Directory to, String src, String dest) throws IOException {
+ delegate.copy(to, src, dest);
+ }
+}
Modified: lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java?rev=979856&r1=979855&r2=979856&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestConcurrentMergeScheduler.java Tue Jul 27 20:33:05 2010
@@ -48,7 +48,7 @@ public class TestConcurrentMergeSchedule
if (doFail && Thread.currentThread().getName().equals("main")) {
StackTraceElement[] trace = new Exception().getStackTrace();
for (int i = 0; i < trace.length; i++) {
- if ("doFlush".equals(trace[i].getMethodName())) {
+ if ("flush".equals(trace[i].getMethodName())) {
hitExc = true;
throw new IOException("now failing during flush");
}
Modified: lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestIndexWriter.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestIndexWriter.java?rev=979856&r1=979855&r2=979856&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestIndexWriter.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestIndexWriter.java Tue Jul 27 20:33:05 2010
@@ -2351,7 +2351,7 @@ public class TestIndexWriter extends Luc
StackTraceElement[] trace = new Exception().getStackTrace();
for (int i = 0; i < trace.length; i++) {
if ("abort".equals(trace[i].getMethodName()) ||
- "flushDocument".equals(trace[i].getMethodName())) {
+ "finishDocument".equals(trace[i].getMethodName())) {
if (onlyOnce)
doFail = false;
//System.out.println(Thread.currentThread().getName() + ": now fail");
@@ -2419,7 +2419,7 @@ public class TestIndexWriter extends Luc
for(int i=0;i<NUM_THREADS;i++) {
threads[i].join();
- assertTrue("hit unexpected Throwable", threads[i].error == null);
+ assertTrue("hit unexpected Throwable ", threads[i].error == null);
}
boolean success = false;
@@ -2481,7 +2481,8 @@ public class TestIndexWriter extends Luc
if (doFail) {
StackTraceElement[] trace = new Exception().getStackTrace();
for (int i = 0; i < trace.length; i++) {
- if ("closeDocStore".equals(trace[i].getMethodName())) {
+ if ("finishDocument".equals(trace[i].getMethodName())
+ && "org.apache.lucene.index.DocFieldProcessor".equals(trace[i].getClassName())) {
if (onlyOnce)
doFail = false;
throw new IOException("now failing on purpose");
@@ -3007,7 +3008,7 @@ public class TestIndexWriter extends Luc
@Override
boolean testPoint(String name) {
- if (doFail && name.equals("DocumentsWriter.ThreadState.init start"))
+ if (doFail && name.equals("DocumentsWriterPerThread.init start"))
throw new RuntimeException("intentionally failing");
return true;
}
@@ -3020,7 +3021,7 @@ public class TestIndexWriter extends Luc
Document doc = new Document();
doc.add(new Field("field", "a field", Field.Store.YES,
Field.Index.ANALYZED));
- w.addDocument(doc);
+
w.doFail = true;
try {
w.addDocument(doc);
@@ -4945,7 +4946,7 @@ public class TestIndexWriter extends Luc
doc.add(new Field("c", "val", Store.YES, Index.ANALYZED, TermVector.WITH_POSITIONS_OFFSETS));
writer.addDocument(doc);
// The second document should cause a flush.
- assertTrue("flush should have occurred and files created", dir.listAll().length > 5);
+ assertTrue("flush should have occurred and files created", dir.listAll().length > 0);
// After rollback, IW should remove all files
writer.rollback();
Modified: lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java?rev=979856&r1=979855&r2=979856&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestIndexWriterExceptions.java Tue Jul 27 20:33:05 2010
@@ -189,8 +189,9 @@ public class TestIndexWriterExceptions e
threads[i].join();
for(int i=0;i<NUM_THREADS;i++)
- if (threads[i].failure != null)
+ if (threads[i].failure != null) {
fail("thread " + threads[i].getName() + ": hit unexpected failure");
+ }
writer.commit();
Modified: lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestNRTReaderWithThreads.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestNRTReaderWithThreads.java?rev=979856&r1=979855&r2=979856&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestNRTReaderWithThreads.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestNRTReaderWithThreads.java Tue Jul 27 20:33:05 2010
@@ -30,8 +30,7 @@ public class TestNRTReaderWithThreads ex
Random random = new Random();
HeavyAtomicInt seq = new HeavyAtomicInt(1);
- // nocommit
- public void _testIndexing() throws Exception {
+ public void testIndexing() throws Exception {
Directory mainDir = new MockRAMDirectory();
IndexWriter writer = new IndexWriter(mainDir, new IndexWriterConfig(TEST_VERSION_CURRENT, new MockAnalyzer()).setMaxBufferedDocs(10));
((LogMergePolicy) writer.getConfig().getMergePolicy()).setMergeFactor(2);
Modified: lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestThreadedOptimize.java
URL: http://svn.apache.org/viewvc/lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestThreadedOptimize.java?rev=979856&r1=979855&r2=979856&view=diff
==============================================================================
--- lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestThreadedOptimize.java (original)
+++ lucene/dev/branches/realtime_search/lucene/src/test/org/apache/lucene/index/TestThreadedOptimize.java Tue Jul 27 20:33:05 2010
@@ -136,8 +136,7 @@ public class TestThreadedOptimize extend
Run above stress test against RAMDirectory and then
FSDirectory.
*/
- // nocommit
- public void _testThreadedOptimize() throws Exception {
+ public void testThreadedOptimize() throws Exception {
Directory directory = new MockRAMDirectory();
runTest(directory, new SerialMergeScheduler());
runTest(directory, new ConcurrentMergeScheduler());