You are viewing a plain text version of this content. The canonical link for it is here.
Posted to java-commits@lucene.apache.org by mi...@apache.org on 2008/05/06 20:41:12 UTC
svn commit: r653878 - in /lucene/java/trunk: ./
src/java/org/apache/lucene/index/ src/java/org/apache/lucene/store/
src/test/org/apache/lucene/index/ src/test/org/apache/lucene/store/
Author: mikemccand
Date: Tue May 6 11:41:10 2008
New Revision: 653878
URL: http://svn.apache.org/viewvc?rev=653878&view=rev
Log:
LUCENE-1274: add preparCommit() to IW to do phase 1 of 2-phase commit
Added:
lucene/java/trunk/src/test/org/apache/lucene/index/TestTransactions.java
Modified:
lucene/java/trunk/CHANGES.txt
lucene/java/trunk/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java
lucene/java/trunk/src/java/org/apache/lucene/index/DocumentsWriter.java
lucene/java/trunk/src/java/org/apache/lucene/index/FieldsWriter.java
lucene/java/trunk/src/java/org/apache/lucene/index/IndexWriter.java
lucene/java/trunk/src/java/org/apache/lucene/index/SegmentInfos.java
lucene/java/trunk/src/java/org/apache/lucene/store/ChecksumIndexOutput.java
lucene/java/trunk/src/test/org/apache/lucene/index/TestAtomicUpdate.java
lucene/java/trunk/src/test/org/apache/lucene/index/TestIndexWriter.java
lucene/java/trunk/src/test/org/apache/lucene/index/TestStressIndexing.java
lucene/java/trunk/src/test/org/apache/lucene/store/MockRAMDirectory.java
Modified: lucene/java/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/java/trunk/CHANGES.txt?rev=653878&r1=653877&r2=653878&view=diff
==============================================================================
--- lucene/java/trunk/CHANGES.txt (original)
+++ lucene/java/trunk/CHANGES.txt Tue May 6 11:41:10 2008
@@ -97,6 +97,12 @@
8. LUCENE-1267: Added numDocs() and maxDoc() to IndexWriter;
deprecated docCount(). (Mike McCandless)
+
+ 9. LUCENE-1274: Added new prepareCommit() method to IndexWriter,
+ which does phase 1 of a 2-phase commit (commit() does phase 2).
+ This is needed when you want to update an index as part of a
+ transaction involving external resources (eg a database). Also
+ deprecated abort(), renaming it to rollback(). (Mike McCandless)
New features
Modified: lucene/java/trunk/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java
URL: http://svn.apache.org/viewvc/lucene/java/trunk/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java?rev=653878&r1=653877&r2=653878&view=diff
==============================================================================
--- lucene/java/trunk/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java (original)
+++ lucene/java/trunk/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java Tue May 6 11:41:10 2008
@@ -138,6 +138,9 @@
public void merge(IndexWriter writer)
throws CorruptIndexException, IOException {
+ // TODO: enable this once we are on JRE 1.5
+ // assert !Thread.holdsLock(writer);
+
this.writer = writer;
initMergeThreadPriority();
Modified: lucene/java/trunk/src/java/org/apache/lucene/index/DocumentsWriter.java
URL: http://svn.apache.org/viewvc/lucene/java/trunk/src/java/org/apache/lucene/index/DocumentsWriter.java?rev=653878&r1=653877&r2=653878&view=diff
==============================================================================
--- lucene/java/trunk/src/java/org/apache/lucene/index/DocumentsWriter.java (original)
+++ lucene/java/trunk/src/java/org/apache/lucene/index/DocumentsWriter.java Tue May 6 11:41:10 2008
@@ -348,11 +348,12 @@
abortCount++;
}
- /** Called if we hit an exception when adding docs,
- * flushing, etc. This resets our state, discarding any
- * docs added since last flush. If ae is non-null, it
- * contains the root cause exception (which we re-throw
- * after we are done aborting). */
+ /** Called if we hit an exception at a bad time (when
+ * updating the index files) and must discard all
+ * currently buffered docs. This resets our state,
+ * discarding any docs added since last flush. If ae is
+ * non-null, it contains the root cause exception (which
+ * we re-throw after we are done aborting). */
synchronized void abort(AbortException ae) throws IOException {
// Anywhere that throws an AbortException must first
Modified: lucene/java/trunk/src/java/org/apache/lucene/index/FieldsWriter.java
URL: http://svn.apache.org/viewvc/lucene/java/trunk/src/java/org/apache/lucene/index/FieldsWriter.java?rev=653878&r1=653877&r2=653878&view=diff
==============================================================================
--- lucene/java/trunk/src/java/org/apache/lucene/index/FieldsWriter.java (original)
+++ lucene/java/trunk/src/java/org/apache/lucene/index/FieldsWriter.java Tue May 6 11:41:10 2008
@@ -64,8 +64,16 @@
success = true;
} finally {
if (!success) {
- close();
- d.deleteFile(fieldsName);
+ try {
+ close();
+ } catch (Throwable t) {
+ // Suppress so we keep throwing the original exception
+ }
+ try {
+ d.deleteFile(fieldsName);
+ } catch (Throwable t) {
+ // Suppress so we keep throwing the original exception
+ }
}
}
@@ -77,9 +85,20 @@
success = true;
} finally {
if (!success) {
- close();
- d.deleteFile(fieldsName);
- d.deleteFile(indexName);
+ try {
+ close();
+ } catch (IOException ioe) {
+ }
+ try {
+ d.deleteFile(fieldsName);
+ } catch (Throwable t) {
+ // Suppress so we keep throwing the original exception
+ }
+ try {
+ d.deleteFile(indexName);
+ } catch (Throwable t) {
+ // Suppress so we keep throwing the original exception
+ }
}
}
Modified: lucene/java/trunk/src/java/org/apache/lucene/index/IndexWriter.java
URL: http://svn.apache.org/viewvc/lucene/java/trunk/src/java/org/apache/lucene/index/IndexWriter.java?rev=653878&r1=653877&r2=653878&view=diff
==============================================================================
--- lucene/java/trunk/src/java/org/apache/lucene/index/IndexWriter.java (original)
+++ lucene/java/trunk/src/java/org/apache/lucene/index/IndexWriter.java Tue May 6 11:41:10 2008
@@ -306,6 +306,9 @@
private SegmentInfos rollbackSegmentInfos; // segmentInfos we will fallback to if the commit fails
private HashMap rollbackSegments;
+ volatile SegmentInfos pendingCommit; // set when a commit is pending (after prepareCommit() & before commit())
+ volatile long pendingCommitChangeCount;
+
private SegmentInfos localRollbackSegmentInfos; // segmentInfos we will fallback to if the commit fails
private boolean localAutoCommit; // saved autoCommit during local transaction
private int localFlushedDocCount; // saved docWriter.getFlushedDocCount during local transaction
@@ -364,12 +367,13 @@
infoStream.println("IW " + messageID + " [" + Thread.currentThread().getName() + "]: " + message);
}
- private synchronized void setMessageID() {
+ private synchronized void setMessageID(PrintStream infoStream) {
if (infoStream != null && messageID == -1) {
synchronized(MESSAGE_ID_LOCK) {
messageID = MESSAGE_ID++;
}
}
+ this.infoStream = infoStream;
}
/**
@@ -1082,9 +1086,8 @@
this.closeDir = closeDir;
directory = d;
analyzer = a;
- this.infoStream = defaultInfoStream;
+ setMessageID(defaultInfoStream);
this.maxFieldLength = maxFieldLength;
- setMessageID();
if (create) {
// Clear the write lock in case it's leftover:
@@ -1496,8 +1499,7 @@
*/
public void setInfoStream(PrintStream infoStream) {
ensureOpen();
- this.infoStream = infoStream;
- setMessageID();
+ setMessageID(infoStream);
docWriter.setInfoStream(infoStream);
deleter.setInfoStream(infoStream);
if (infoStream != null)
@@ -1672,7 +1674,7 @@
if (infoStream != null)
message("now call final commit()");
- commit(true, 0);
+ commit(0);
if (infoStream != null)
message("at close: " + segString());
@@ -2571,7 +2573,7 @@
if (autoCommit) {
boolean success = false;
try {
- commit(true, 0);
+ commit(0);
success = true;
} finally {
if (!success) {
@@ -2588,24 +2590,40 @@
}
/**
+ * @deprecated Please use {@link #rollback} instead.
+ */
+ public void abort() throws IOException {
+ rollback();
+ }
+
+ /**
* Close the <code>IndexWriter</code> without committing
* any of the changes that have occurred since it was
* opened. This removes any temporary files that had been
* created, after which the state of the index will be the
* same as it was when this writer was first opened. This
* can only be called when this IndexWriter was opened
- * with <code>autoCommit=false</code>.
+ * with <code>autoCommit=false</code>. This also clears a
+ * previous call to {@link #prepareCommit}.
* @throws IllegalStateException if this is called when
* the writer was opened with <code>autoCommit=true</code>.
* @throws IOException if there is a low-level IO error
*/
- public void abort() throws IOException {
+ public void rollback() throws IOException {
ensureOpen();
if (autoCommit)
throw new IllegalStateException("abort() can only be called when IndexWriter was opened with autoCommit=false");
boolean doClose;
synchronized(this) {
+
+ if (pendingCommit != null) {
+ pendingCommit.rollbackCommit(directory);
+ deleter.decRef(pendingCommit);
+ pendingCommit = null;
+ notifyAll();
+ }
+
// Ensure that only one thread actually gets to do the closing:
if (!closing) {
doClose = true;
@@ -3113,10 +3131,54 @@
flush(true, false, true);
}
+ /** <p>Expert: prepare for commit. This does the first
+ * phase of 2-phase commit. You can only call this when
+ * autoCommit is false. This method does all steps
+ * necessary to commit changes since this writer was
+ * opened: flushes pending added and deleted docs, syncs
+ * the index files, writes most of next segments_N file.
+ * After calling this you must call either {@link
+ * #commit()} to finish the commit, or {@link
+ * #rollback()} to revert the commit and undo all changes
+ * done since the writer was opened.</p>
+ *
+ * You can also just call {@link #commit()} directly
+ * without prepareCommit first in which case that method
+ * will internally call prepareCommit.
+ */
+ public final void prepareCommit() throws CorruptIndexException, IOException {
+ prepareCommit(false);
+ }
+
+ private final void prepareCommit(boolean internal) throws CorruptIndexException, IOException {
+
+ if (hitOOM)
+ throw new IllegalStateException("this writer hit an OutOfMemoryError; cannot commit");
+
+ if (autoCommit && !internal)
+ throw new IllegalStateException("this method can only be used when autoCommit is false");
+
+ if (!autoCommit && pendingCommit != null)
+ throw new IllegalStateException("prepareCommit was already called with no corresponding call to commit");
+
+ message("prepareCommit: flush");
+
+ flush(true, true, true);
+
+ startCommit(0);
+ }
+
+ private void commit(long sizeInBytes) throws IOException {
+ startCommit(sizeInBytes);
+ finishCommit();
+ }
+
/**
- * <p>Commits all pending updates (added & deleted documents)
- * to the index, and syncs all referenced index files,
- * such that a reader will see the changes. Note that
+ * <p>Commits all pending updates (added & deleted
+ * documents) to the index, and syncs all referenced index
+ * files, such that a reader will see the changes and the
+ * index updates will survive an OS or machine crash or
+ * power loss (though, see the note below). Note that
* this does not wait for any running background merges to
* finish. This may be a costly operation, so you should
* test the cost in your application and do it only when
@@ -3135,12 +3197,38 @@
* consistency on such devices. </p>
*/
public final void commit() throws CorruptIndexException, IOException {
- commit(true);
+
+ message("commit: start");
+
+ if (autoCommit || pendingCommit == null) {
+ message("commit: now prepare");
+ prepareCommit(true);
+ } else
+ message("commit: already prepared");
+
+ finishCommit();
}
- private final void commit(boolean triggerMerges) throws CorruptIndexException, IOException {
- flush(triggerMerges, true, true);
- commit(true, 0);
+ private synchronized final void finishCommit() throws CorruptIndexException, IOException {
+
+ if (pendingCommit != null) {
+ try {
+ message("commit: pendingCommit != null");
+ pendingCommit.finishCommit(directory);
+ lastCommitChangeCount = pendingCommitChangeCount;
+ segmentInfos.updateGeneration(pendingCommit);
+ setRollbackSegmentInfos();
+ deleter.checkpoint(pendingCommit, true);
+ } finally {
+ deleter.decRef(pendingCommit);
+ pendingCommit = null;
+ notifyAll();
+ }
+
+ } else
+ message("commit: pendingCommit == null; skip");
+
+ message("commit: done");
}
/**
@@ -3176,8 +3264,7 @@
// when flushing a segment; otherwise deletes may become
// visible before their corresponding added document
// from an updateDocument call
- if (autoCommit)
- flushDeletes = true;
+ flushDeletes |= autoCommit;
// Returns true if docWriter is currently aborting, in
// which case we skip flushing this segment
@@ -3935,7 +4022,7 @@
synchronized(this) {
size = merge.info.sizeInBytes();
}
- commit(false, size);
+ commit(size);
}
success = false;
@@ -3988,7 +4075,7 @@
synchronized(this) {
size = merge.info.sizeInBytes();
}
- commit(false, size);
+ commit(size);
}
return mergedDocCount;
@@ -4151,13 +4238,13 @@
}
/** Walk through all files referenced by the current
- * segmentInfos, minus flushes, and ask the Directory to
- * sync each file, if it wasn't already. If that
- * succeeds, then we write a new segments_N file & sync
- * that. */
- private void commit(boolean skipWait, long sizeInBytes) throws IOException {
+ * segmentInfos and ask the Directory to sync each file,
+ * if it wasn't already. If that succeeds, then we
+ * prepare a new segments_N file but do not fully commit
+ * it. */
+ private void startCommit(long sizeInBytes) throws IOException {
- assert testPoint("startCommit");
+ assert testPoint("startStartCommit");
if (hitOOM)
return;
@@ -4165,9 +4252,9 @@
try {
if (infoStream != null)
- message("start commit() skipWait=" + skipWait + " sizeInBytes=" + sizeInBytes);
+ message("startCommit(): start sizeInBytes=" + sizeInBytes);
- if (!skipWait)
+ if (sizeInBytes > 0)
syncPause(sizeInBytes);
SegmentInfos toSync = null;
@@ -4179,7 +4266,7 @@
if (changeCount == lastCommitChangeCount) {
if (infoStream != null)
- message(" skip commit(): no changes pending");
+ message(" skip startCommit(): no changes pending");
return;
}
@@ -4189,15 +4276,17 @@
// threads can be doing this at once, if say a large
// merge and a small merge finish at the same time:
+ if (infoStream != null)
+ message("startCommit index=" + segString(segmentInfos) + " changeCount=" + changeCount);
+
toSync = (SegmentInfos) segmentInfos.clone();
deleter.incRef(toSync, false);
myChangeCount = changeCount;
}
- if (infoStream != null)
- message("commit index=" + segString(toSync));
+ assert testPoint("midStartCommit");
- assert testPoint("midCommit");
+ boolean setPending = false;
try {
@@ -4237,54 +4326,72 @@
break;
}
- assert testPoint("midCommit2");
-
+ assert testPoint("midStartCommit2");
+
synchronized(this) {
// If someone saved a newer version of segments file
// since I first started syncing my version, I can
// safely skip saving myself since I've been
// superseded:
- if (myChangeCount > lastCommitChangeCount) {
-
+ if (myChangeCount > lastCommitChangeCount && (pendingCommit == null || myChangeCount > pendingCommitChangeCount)) {
+
+ // Wait now for any current pending commit to complete:
+ while(pendingCommit != null) {
+ message("wait for existing pendingCommit to finish...");
+ try {
+ wait();
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
if (segmentInfos.getGeneration() > toSync.getGeneration())
toSync.updateGeneration(segmentInfos);
boolean success = false;
try {
- toSync.commit(directory);
+
+ // Exception here means nothing is prepared
+ // (this method unwinds everything it did on
+ // an exception)
+ try {
+ toSync.prepareCommit(directory);
+ } finally {
+ // Have our master segmentInfos record the
+ // generations we just prepared. We do this
+ // on error or success so we don't
+ // double-write a segments_N file.
+ segmentInfos.updateGeneration(toSync);
+ }
+
+ assert pendingCommit == null;
+ setPending = true;
+ pendingCommit = toSync;
+ pendingCommitChangeCount = myChangeCount;
success = true;
} finally {
- // Have our master segmentInfos record the
- // generations we just sync'd
- segmentInfos.updateGeneration(toSync);
if (!success)
message("hit exception committing segments file");
}
-
- message("commit complete");
-
- lastCommitChangeCount = myChangeCount;
-
- deleter.checkpoint(toSync, true);
- setRollbackSegmentInfos();
} else
message("sync superseded by newer infos");
}
message("done all syncs");
- assert testPoint("midCommitSuccess");
+ assert testPoint("midStartCommitSuccess");
} finally {
synchronized(this) {
- deleter.decRef(toSync);
+ if (!setPending)
+ deleter.decRef(toSync);
}
}
} catch (OutOfMemoryError oom) {
hitOOM = true;
throw oom;
}
- assert testPoint("finishCommit");
+ assert testPoint("finishStartCommit");
}
/**
@@ -4377,11 +4484,11 @@
// Used only by assert for testing. Current points:
// startDoFlush
// startCommitMerge
- // startCommit
- // midCommit
- // midCommit2
- // midCommitSuccess
- // finishCommit
+ // startStartCommit
+ // midStartCommit
+ // midStartCommit2
+ // midStartCommitSuccess
+ // finishStartCommit
// startCommitMergeDeletes
// startMergeInit
// startApplyDeletes
Modified: lucene/java/trunk/src/java/org/apache/lucene/index/SegmentInfos.java
URL: http://svn.apache.org/viewvc/lucene/java/trunk/src/java/org/apache/lucene/index/SegmentInfos.java?rev=653878&r1=653877&r2=653878&view=diff
==============================================================================
--- lucene/java/trunk/src/java/org/apache/lucene/index/SegmentInfos.java (original)
+++ lucene/java/trunk/src/java/org/apache/lucene/index/SegmentInfos.java Tue May 6 11:41:10 2008
@@ -274,6 +274,10 @@
}.run();
}
+ // Only non-null after prepareCommit has been called and
+ // before finishCommit is called
+ ChecksumIndexOutput pendingOutput;
+
private final void write(Directory directory) throws IOException {
String segmentFileName = getNextSegmentFileName();
@@ -298,53 +302,27 @@
for (int i = 0; i < size(); i++) {
info(i).write(output);
}
- final long checksum = output.getChecksum();
- output.writeLong(checksum);
+ output.prepareCommit();
success = true;
+ pendingOutput = output;
} finally {
- boolean success2 = false;
- try {
- if (!success) {
- // We hit an exception above; try to close the file
- // but suppress any exception:
- try {
- output.close();
- success2 = true;
- } catch (Throwable t) {
- // Suppress so we keep throwing the original exception
- }
- } else {
+ if (!success) {
+ // We hit an exception above; try to close the file
+ // but suppress any exception:
+ try {
output.close();
- success2 = true;
+ } catch (Throwable t) {
+ // Suppress so we keep throwing the original exception
}
- } finally {
- if (!success || !success2) {
- try {
- // Try not to leave a truncated segments_N file in
- // the index:
- directory.deleteFile(segmentFileName);
- } catch (Throwable t) {
- // Suppress so we keep throwing the original exception
- }
+ try {
+ // Try not to leave a truncated segments_N file in
+ // the index:
+ directory.deleteFile(segmentFileName);
+ } catch (Throwable t) {
+ // Suppress so we keep throwing the original exception
}
}
}
-
- try {
- IndexOutput genOutput = directory.createOutput(IndexFileNames.SEGMENTS_GEN);
- try {
- genOutput.writeInt(FORMAT_LOCKLESS);
- genOutput.writeLong(generation);
- genOutput.writeLong(generation);
- } finally {
- genOutput.close();
- }
- } catch (IOException e) {
- // It's OK if we fail to write this file since it's
- // used only as one of the retry fallbacks.
- }
-
- lastGeneration = generation;
}
/**
@@ -355,7 +333,7 @@
public Object clone() {
SegmentInfos sis = (SegmentInfos) super.clone();
for(int i=0;i<sis.size();i++) {
- sis.setElementAt(((SegmentInfo) sis.elementAt(i)).clone(), i);
+ sis.setElementAt(sis.info(i).clone(), i);
}
return sis;
}
@@ -739,45 +717,73 @@
// Carry over generation numbers from another SegmentInfos
void updateGeneration(SegmentInfos other) {
- assert other.generation > generation;
lastGeneration = other.lastGeneration;
generation = other.generation;
version = other.version;
}
- /** Writes & syncs to the Directory dir, taking care to
- * remove the segments file on exception */
- public final void commit(Directory dir) throws IOException {
- boolean success = false;
- try {
- write(dir);
- success = true;
- } finally {
- if (!success) {
- // Must carefully compute fileName from "generation"
- // since lastGeneration isn't incremented:
+ public final void rollbackCommit(Directory dir) throws IOException {
+ if (pendingOutput != null) {
+ try {
+ pendingOutput.close();
+ } catch (Throwable t) {
+ // Suppress so we keep throwing the original exception
+ // in our caller
+ }
+
+ // Must carefully compute fileName from "generation"
+ // since lastGeneration isn't incremented:
+ try {
final String segmentFileName = IndexFileNames.fileNameFromGeneration(IndexFileNames.SEGMENTS,
"",
generation);
- try {
- dir.deleteFile(segmentFileName);
- } catch (Throwable t) {
- // Suppress so we keep throwing the original exception
- }
+ dir.deleteFile(segmentFileName);
+ } catch (Throwable t) {
+ // Suppress so we keep throwing the original exception
+ // in our caller
}
+ pendingOutput = null;
+ }
+ }
+
+ /** Call this to start a commit. This writes the new
+ * segments file, but writes an invalid checksum at the
+ * end, so that it is not visible to readers. Once this
+ * is called you must call {@link #finishCommit} to complete
+ * the commit or {@link #rollbackCommit} to abort it. */
+ public final void prepareCommit(Directory dir) throws IOException {
+ if (pendingOutput != null)
+ throw new IllegalStateException("prepareCommit was already called");
+ write(dir);
+ }
+
+ public final void finishCommit(Directory dir) throws IOException {
+ if (pendingOutput == null)
+ throw new IllegalStateException("prepareCommit was not called");
+ boolean success = false;
+ try {
+ pendingOutput.finishCommit();
+ pendingOutput.close();
+ pendingOutput = null;
+ success = true;
+ } finally {
+ if (!success)
+ rollbackCommit(dir);
}
// NOTE: if we crash here, we have left a segments_N
// file in the directory in a possibly corrupt state (if
// some bytes made it to stable storage and others
- // didn't). But, the segments_N file now includes
- // checksum at the end, which should catch this case.
- // So when a reader tries to read it, it will throw a
+ // didn't). But, the segments_N file includes checksum
+ // at the end, which should catch this case. So when a
+ // reader tries to read it, it will throw a
// CorruptIndexException, which should cause the retry
// logic in SegmentInfos to kick in and load the last
// good (previous) segments_N-1 file.
- final String fileName = getCurrentSegmentFileName();
+ final String fileName = IndexFileNames.fileNameFromGeneration(IndexFileNames.SEGMENTS,
+ "",
+ generation);
success = false;
try {
dir.sync(fileName);
@@ -791,5 +797,28 @@
}
}
}
+
+ lastGeneration = generation;
+
+ try {
+ IndexOutput genOutput = dir.createOutput(IndexFileNames.SEGMENTS_GEN);
+ try {
+ genOutput.writeInt(FORMAT_LOCKLESS);
+ genOutput.writeLong(generation);
+ genOutput.writeLong(generation);
+ } finally {
+ genOutput.close();
+ }
+ } catch (Throwable t) {
+ // It's OK if we fail to write this file since it's
+ // used only as one of the retry fallbacks.
+ }
+ }
+
+ /** Writes & syncs to the Directory dir, taking care to
+ * remove the segments file on exception */
+ public final void commit(Directory dir) throws IOException {
+ prepareCommit(dir);
+ finishCommit(dir);
}
}
Modified: lucene/java/trunk/src/java/org/apache/lucene/store/ChecksumIndexOutput.java
URL: http://svn.apache.org/viewvc/lucene/java/trunk/src/java/org/apache/lucene/store/ChecksumIndexOutput.java?rev=653878&r1=653877&r2=653878&view=diff
==============================================================================
--- lucene/java/trunk/src/java/org/apache/lucene/store/ChecksumIndexOutput.java (original)
+++ lucene/java/trunk/src/java/org/apache/lucene/store/ChecksumIndexOutput.java Tue May 6 11:41:10 2008
@@ -62,6 +62,30 @@
throw new RuntimeException("not allowed");
}
+ /**
+ * Starts but does not complete the commit of this file (=
+ * writing of the final checksum at the end). After this
+ * is called must call {@link #finishCommit} and the
+ * {@link #close} to complete the commit.
+ */
+ public void prepareCommit() throws IOException {
+ final long checksum = getChecksum();
+ // Intentionally write a mismatched checksum. This is
+ // because we want to 1) test, as best we can, that we
+ // are able to write a long to the file, but 2) not
+ // actually "commit" the file yet. This (prepare
+ // commit) is phase 1 of a two-phase commit.
+ final long pos = main.getFilePointer();
+ main.writeLong(checksum-1);
+ main.flush();
+ main.seek(pos);
+ }
+
+ /** See {@link #prepareCommit} */
+ public void finishCommit() throws IOException {
+ main.writeLong(getChecksum());
+ }
+
public long length() throws IOException {
return main.length();
}
Modified: lucene/java/trunk/src/test/org/apache/lucene/index/TestAtomicUpdate.java
URL: http://svn.apache.org/viewvc/lucene/java/trunk/src/test/org/apache/lucene/index/TestAtomicUpdate.java?rev=653878&r1=653877&r2=653878&view=diff
==============================================================================
--- lucene/java/trunk/src/test/org/apache/lucene/index/TestAtomicUpdate.java (original)
+++ lucene/java/trunk/src/test/org/apache/lucene/index/TestAtomicUpdate.java Tue May 6 11:41:10 2008
@@ -68,6 +68,7 @@
count++;
}
} catch (Throwable e) {
+ System.out.println(Thread.currentThread().getName() + ": exc");
e.printStackTrace(System.out);
failed = true;
}
@@ -111,11 +112,7 @@
public void doWork() throws Throwable {
IndexReader r = IndexReader.open(directory);
- try {
- assertEquals(100, r.numDocs());
- } catch (Throwable t) {
- throw t;
- }
+ assertEquals(100, r.numDocs());
r.close();
}
}
@@ -141,6 +138,10 @@
}
writer.commit();
+ IndexReader r = IndexReader.open(directory);
+ assertEquals(100, r.numDocs());
+ r.close();
+
IndexerThread indexerThread = new IndexerThread(writer, threads);
threads[0] = indexerThread;
indexerThread.start();
Modified: lucene/java/trunk/src/test/org/apache/lucene/index/TestIndexWriter.java
URL: http://svn.apache.org/viewvc/lucene/java/trunk/src/test/org/apache/lucene/index/TestIndexWriter.java?rev=653878&r1=653877&r2=653878&view=diff
==============================================================================
--- lucene/java/trunk/src/test/org/apache/lucene/index/TestIndexWriter.java (original)
+++ lucene/java/trunk/src/test/org/apache/lucene/index/TestIndexWriter.java Tue May 6 11:41:10 2008
@@ -3302,7 +3302,7 @@
boolean isCommit = false;
boolean isDelete = false;
for (int i = 0; i < trace.length; i++) {
- if ("org.apache.lucene.index.SegmentInfos".equals(trace[i].getClassName()) && "commit".equals(trace[i].getMethodName()))
+ if ("org.apache.lucene.index.SegmentInfos".equals(trace[i].getClassName()) && "prepareCommit".equals(trace[i].getMethodName()))
isCommit = true;
if ("org.apache.lucene.store.MockRAMDirectory".equals(trace[i].getClassName()) && "deleteFile".equals(trace[i].getMethodName()))
isDelete = true;
@@ -3603,4 +3603,124 @@
s.close();
dir.close();
}
+
+ // LUCENE-1274: test writer.prepareCommit()
+ public void testPrepareCommit() throws IOException {
+ Directory dir = new MockRAMDirectory();
+
+ IndexWriter writer = new IndexWriter(dir, false, new WhitespaceAnalyzer(), IndexWriter.MaxFieldLength.LIMITED);
+ writer.setMaxBufferedDocs(2);
+ writer.setMergeFactor(5);
+
+ for (int i = 0; i < 23; i++)
+ addDoc(writer);
+
+ IndexReader reader = IndexReader.open(dir);
+ assertEquals(0, reader.numDocs());
+
+ writer.prepareCommit();
+
+ IndexReader reader2 = IndexReader.open(dir);
+ assertEquals(0, reader2.numDocs());
+
+ writer.commit();
+
+ IndexReader reader3 = reader.reopen();
+ assertEquals(0, reader.numDocs());
+ assertEquals(0, reader2.numDocs());
+ assertEquals(23, reader3.numDocs());
+ reader.close();
+ reader2.close();
+
+ for (int i = 0; i < 17; i++)
+ addDoc(writer);
+
+ assertEquals(23, reader3.numDocs());
+ reader3.close();
+ reader = IndexReader.open(dir);
+ assertEquals(23, reader.numDocs());
+ reader.close();
+
+ writer.prepareCommit();
+
+ reader = IndexReader.open(dir);
+ assertEquals(23, reader.numDocs());
+ reader.close();
+
+ writer.commit();
+ reader = IndexReader.open(dir);
+ assertEquals(40, reader.numDocs());
+ reader.close();
+ writer.close();
+ dir.close();
+ }
+
+ // LUCENE-1274: test writer.prepareCommit()
+ public void testPrepareCommitRollback() throws IOException {
+ MockRAMDirectory dir = new MockRAMDirectory();
+ dir.setPreventDoubleWrite(false);
+
+ IndexWriter writer = new IndexWriter(dir, false, new WhitespaceAnalyzer(), IndexWriter.MaxFieldLength.LIMITED);
+
+ writer.setMaxBufferedDocs(2);
+ writer.setMergeFactor(5);
+
+ for (int i = 0; i < 23; i++)
+ addDoc(writer);
+
+ IndexReader reader = IndexReader.open(dir);
+ assertEquals(0, reader.numDocs());
+
+ writer.prepareCommit();
+
+ IndexReader reader2 = IndexReader.open(dir);
+ assertEquals(0, reader2.numDocs());
+
+ writer.rollback();
+
+ IndexReader reader3 = reader.reopen();
+ assertEquals(0, reader.numDocs());
+ assertEquals(0, reader2.numDocs());
+ assertEquals(0, reader3.numDocs());
+ reader.close();
+ reader2.close();
+
+ writer = new IndexWriter(dir, false, new WhitespaceAnalyzer(), IndexWriter.MaxFieldLength.LIMITED);
+ for (int i = 0; i < 17; i++)
+ addDoc(writer);
+
+ assertEquals(0, reader3.numDocs());
+ reader3.close();
+ reader = IndexReader.open(dir);
+ assertEquals(0, reader.numDocs());
+ reader.close();
+
+ writer.prepareCommit();
+
+ reader = IndexReader.open(dir);
+ assertEquals(0, reader.numDocs());
+ reader.close();
+
+ writer.commit();
+ reader = IndexReader.open(dir);
+ assertEquals(17, reader.numDocs());
+ reader.close();
+ writer.close();
+ dir.close();
+ }
+
+ // LUCENE-1274
+ public void testPrepareCommitNoChanges() throws IOException {
+ MockRAMDirectory dir = new MockRAMDirectory();
+
+ IndexWriter writer = new IndexWriter(dir, false, new WhitespaceAnalyzer(), IndexWriter.MaxFieldLength.LIMITED);
+ writer.prepareCommit();
+ writer.commit();
+ writer.close();
+
+ IndexReader reader = IndexReader.open(dir);
+ assertEquals(0, reader.numDocs());
+ reader.close();
+ dir.close();
+ }
}
Modified: lucene/java/trunk/src/test/org/apache/lucene/index/TestStressIndexing.java
URL: http://svn.apache.org/viewvc/lucene/java/trunk/src/test/org/apache/lucene/index/TestStressIndexing.java?rev=653878&r1=653877&r2=653878&view=diff
==============================================================================
--- lucene/java/trunk/src/test/org/apache/lucene/index/TestStressIndexing.java (original)
+++ lucene/java/trunk/src/test/org/apache/lucene/index/TestStressIndexing.java Tue May 6 11:41:10 2008
@@ -53,6 +53,7 @@
count++;
}
} catch (Throwable e) {
+ System.out.println(Thread.currentThread() + ": exc");
e.printStackTrace(System.out);
failed = true;
}
Added: lucene/java/trunk/src/test/org/apache/lucene/index/TestTransactions.java
URL: http://svn.apache.org/viewvc/lucene/java/trunk/src/test/org/apache/lucene/index/TestTransactions.java?rev=653878&view=auto
==============================================================================
--- lucene/java/trunk/src/test/org/apache/lucene/index/TestTransactions.java (added)
+++ lucene/java/trunk/src/test/org/apache/lucene/index/TestTransactions.java Tue May 6 11:41:10 2008
@@ -0,0 +1,217 @@
+package org.apache.lucene.index;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.util.Random;
+import org.apache.lucene.store.*;
+import org.apache.lucene.util.*;
+import org.apache.lucene.analysis.*;
+import org.apache.lucene.document.*;
+
+public class TestTransactions extends LuceneTestCase
+{
+ private static final Random RANDOM = new Random();
+ private static volatile boolean doFail;
+
+ private class RandomFailure extends MockRAMDirectory.Failure {
+ public void eval(MockRAMDirectory dir) throws IOException {
+ if (TestTransactions.doFail && RANDOM.nextInt() % 10 <= 3)
+ throw new IOException("now failing randomly but on purpose");
+ }
+ }
+
+ private static abstract class TimedThread extends Thread {
+ boolean failed;
+ private static int RUN_TIME_SEC = 6;
+ private TimedThread[] allThreads;
+
+ abstract public void doWork() throws Throwable;
+
+ TimedThread(TimedThread[] threads) {
+ this.allThreads = threads;
+ }
+
+ public void run() {
+ final long stopTime = System.currentTimeMillis() + 1000*RUN_TIME_SEC;
+
+ try {
+ while(System.currentTimeMillis() < stopTime && !anyErrors())
+ doWork();
+ } catch (Throwable e) {
+ System.out.println(Thread.currentThread() + ": exc");
+ e.printStackTrace(System.out);
+ failed = true;
+ }
+ }
+
+ private boolean anyErrors() {
+ for(int i=0;i<allThreads.length;i++)
+ if (allThreads[i] != null && allThreads[i].failed)
+ return true;
+ return false;
+ }
+ }
+
+ private static class IndexerThread extends TimedThread {
+ Directory dir1;
+ Directory dir2;
+ Object lock;
+ int nextID;
+
+ public IndexerThread(Object lock, Directory dir1, Directory dir2, TimedThread[] threads) {
+ super(threads);
+ this.lock = lock;
+ this.dir1 = dir1;
+ this.dir2 = dir2;
+ }
+
+ public void doWork() throws Throwable {
+
+ IndexWriter writer1 = new IndexWriter(dir1, false, new WhitespaceAnalyzer(), IndexWriter.MaxFieldLength.LIMITED);
+ writer1.setMaxBufferedDocs(3);
+ writer1.setMergeFactor(2);
+ ((ConcurrentMergeScheduler) writer1.getMergeScheduler()).setSuppressExceptions();
+
+ IndexWriter writer2 = new IndexWriter(dir2, false, new WhitespaceAnalyzer(), IndexWriter.MaxFieldLength.LIMITED);
+ // Intentionally use different params so flush/merge
+ // happen @ different times
+ writer2.setMaxBufferedDocs(2);
+ writer2.setMergeFactor(3);
+ ((ConcurrentMergeScheduler) writer2.getMergeScheduler()).setSuppressExceptions();
+
+ update(writer1);
+ update(writer2);
+
+ TestTransactions.doFail = true;
+ try {
+ synchronized(lock) {
+ try {
+ writer1.prepareCommit();
+ } catch (Throwable t) {
+ writer1.rollback();
+ writer2.rollback();
+ return;
+ }
+ try {
+ writer2.prepareCommit();
+ } catch (Throwable t) {
+ writer1.rollback();
+ writer2.rollback();
+ return;
+ }
+
+ writer1.commit();
+ writer2.commit();
+ }
+ } finally {
+ TestTransactions.doFail = false;
+ }
+
+ writer1.close();
+ writer2.close();
+ }
+
+ public void update(IndexWriter writer) throws IOException {
+ // Add 10 docs:
+ for(int j=0; j<10; j++) {
+ Document d = new Document();
+ int n = RANDOM.nextInt();
+ d.add(new Field("id", Integer.toString(nextID++), Field.Store.YES, Field.Index.UN_TOKENIZED));
+ d.add(new Field("contents", English.intToEnglish(n), Field.Store.NO, Field.Index.TOKENIZED));
+ writer.addDocument(d);
+ }
+
+ // Delete 5 docs:
+ int deleteID = nextID-1;
+ for(int j=0; j<5; j++) {
+ writer.deleteDocuments(new Term("id", ""+deleteID));
+ deleteID -= 2;
+ }
+ }
+ }
+
+ private static class SearcherThread extends TimedThread {
+ Directory dir1;
+ Directory dir2;
+ Object lock;
+
+ public SearcherThread(Object lock, Directory dir1, Directory dir2, TimedThread[] threads) {
+ super(threads);
+ this.lock = lock;
+ this.dir1 = dir1;
+ this.dir2 = dir2;
+ }
+
+ public void doWork() throws Throwable {
+ IndexReader r1, r2;
+ synchronized(lock) {
+ r1 = IndexReader.open(dir1);
+ r2 = IndexReader.open(dir2);
+ }
+ if (r1.numDocs() != r2.numDocs())
+ throw new RuntimeException("doc counts differ: r1=" + r1.numDocs() + " r2=" + r2.numDocs());
+ r1.close();
+ r2.close();
+ }
+ }
+
+ public void initIndex(Directory dir) throws Throwable {
+ IndexWriter writer = new IndexWriter(dir, false, new WhitespaceAnalyzer(), IndexWriter.MaxFieldLength.LIMITED);
+ for(int j=0; j<7; j++) {
+ Document d = new Document();
+ int n = RANDOM.nextInt();
+ d.add(new Field("contents", English.intToEnglish(n), Field.Store.NO, Field.Index.TOKENIZED));
+ writer.addDocument(d);
+ }
+ writer.close();
+ }
+
+ public void testTransactions() throws Throwable {
+ MockRAMDirectory dir1 = new MockRAMDirectory();
+ MockRAMDirectory dir2 = new MockRAMDirectory();
+ dir1.setPreventDoubleWrite(false);
+ dir2.setPreventDoubleWrite(false);
+ dir1.failOn(new RandomFailure());
+ dir2.failOn(new RandomFailure());
+
+ initIndex(dir1);
+ initIndex(dir2);
+
+ TimedThread[] threads = new TimedThread[3];
+ int numThread = 0;
+
+ IndexerThread indexerThread = new IndexerThread(this, dir1, dir2, threads);
+ threads[numThread++] = indexerThread;
+ indexerThread.start();
+
+ SearcherThread searcherThread1 = new SearcherThread(this, dir1, dir2, threads);
+ threads[numThread++] = searcherThread1;
+ searcherThread1.start();
+
+ SearcherThread searcherThread2 = new SearcherThread(this, dir1, dir2, threads);
+ threads[numThread++] = searcherThread2;
+ searcherThread2.start();
+
+ for(int i=0;i<numThread;i++)
+ threads[i].join();
+
+ for(int i=0;i<numThread;i++)
+ assertTrue(!((TimedThread) threads[i]).failed);
+ }
+}
Modified: lucene/java/trunk/src/test/org/apache/lucene/store/MockRAMDirectory.java
URL: http://svn.apache.org/viewvc/lucene/java/trunk/src/test/org/apache/lucene/store/MockRAMDirectory.java?rev=653878&r1=653877&r2=653878&view=diff
==============================================================================
--- lucene/java/trunk/src/test/org/apache/lucene/store/MockRAMDirectory.java (original)
+++ lucene/java/trunk/src/test/org/apache/lucene/store/MockRAMDirectory.java Tue May 6 11:41:10 2008
@@ -209,7 +209,7 @@
throw new IOException("cannot createOutput after crash");
init();
synchronized(openFiles) {
- if (preventDoubleWrite && createdFiles.contains(name))
+ if (preventDoubleWrite && createdFiles.contains(name) && !name.equals("segments.gen"))
throw new IOException("file \"" + name + "\" was already written to");
if (noDeleteOpenFile && openFiles.containsKey(name))
throw new IOException("MockRAMDirectory: file \"" + name + "\" is still open: cannot overwrite");