You are viewing a plain text version of this content. The canonical link for it is here.
Posted to java-commits@lucene.apache.org by mi...@apache.org on 2008/05/06 20:41:12 UTC

svn commit: r653878 - in /lucene/java/trunk: ./ src/java/org/apache/lucene/index/ src/java/org/apache/lucene/store/ src/test/org/apache/lucene/index/ src/test/org/apache/lucene/store/

Author: mikemccand
Date: Tue May  6 11:41:10 2008
New Revision: 653878

URL: http://svn.apache.org/viewvc?rev=653878&view=rev
Log:
LUCENE-1274: add preparCommit() to IW to do phase 1 of 2-phase commit

Added:
    lucene/java/trunk/src/test/org/apache/lucene/index/TestTransactions.java
Modified:
    lucene/java/trunk/CHANGES.txt
    lucene/java/trunk/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java
    lucene/java/trunk/src/java/org/apache/lucene/index/DocumentsWriter.java
    lucene/java/trunk/src/java/org/apache/lucene/index/FieldsWriter.java
    lucene/java/trunk/src/java/org/apache/lucene/index/IndexWriter.java
    lucene/java/trunk/src/java/org/apache/lucene/index/SegmentInfos.java
    lucene/java/trunk/src/java/org/apache/lucene/store/ChecksumIndexOutput.java
    lucene/java/trunk/src/test/org/apache/lucene/index/TestAtomicUpdate.java
    lucene/java/trunk/src/test/org/apache/lucene/index/TestIndexWriter.java
    lucene/java/trunk/src/test/org/apache/lucene/index/TestStressIndexing.java
    lucene/java/trunk/src/test/org/apache/lucene/store/MockRAMDirectory.java

Modified: lucene/java/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/java/trunk/CHANGES.txt?rev=653878&r1=653877&r2=653878&view=diff
==============================================================================
--- lucene/java/trunk/CHANGES.txt (original)
+++ lucene/java/trunk/CHANGES.txt Tue May  6 11:41:10 2008
@@ -97,6 +97,12 @@
 
  8. LUCENE-1267: Added numDocs() and maxDoc() to IndexWriter;
     deprecated docCount().  (Mike McCandless)
+
+ 9. LUCENE-1274: Added new prepareCommit() method to IndexWriter,
+    which does phase 1 of a 2-phase commit (commit() does phase 2).
+    This is needed when you want to update an index as part of a
+    transaction involving external resources (eg a database).  Also
+    deprecated abort(), renaming it to rollback().  (Mike McCandless)
 	
 New features
 

Modified: lucene/java/trunk/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java
URL: http://svn.apache.org/viewvc/lucene/java/trunk/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java?rev=653878&r1=653877&r2=653878&view=diff
==============================================================================
--- lucene/java/trunk/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java (original)
+++ lucene/java/trunk/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java Tue May  6 11:41:10 2008
@@ -138,6 +138,9 @@
   public void merge(IndexWriter writer)
     throws CorruptIndexException, IOException {
 
+    // TODO: enable this once we are on JRE 1.5
+    // assert !Thread.holdsLock(writer);
+
     this.writer = writer;
 
     initMergeThreadPriority();

Modified: lucene/java/trunk/src/java/org/apache/lucene/index/DocumentsWriter.java
URL: http://svn.apache.org/viewvc/lucene/java/trunk/src/java/org/apache/lucene/index/DocumentsWriter.java?rev=653878&r1=653877&r2=653878&view=diff
==============================================================================
--- lucene/java/trunk/src/java/org/apache/lucene/index/DocumentsWriter.java (original)
+++ lucene/java/trunk/src/java/org/apache/lucene/index/DocumentsWriter.java Tue May  6 11:41:10 2008
@@ -348,11 +348,12 @@
     abortCount++;
   }
 
-  /** Called if we hit an exception when adding docs,
-   *  flushing, etc.  This resets our state, discarding any
-   *  docs added since last flush.  If ae is non-null, it
-   *  contains the root cause exception (which we re-throw
-   *  after we are done aborting). */
+  /** Called if we hit an exception at a bad time (when
+   *  updating the index files) and must discard all
+   *  currently buffered docs.  This resets our state,
+   *  discarding any docs added since last flush.  If ae is
+   *  non-null, it contains the root cause exception (which
+   *  we re-throw after we are done aborting). */
   synchronized void abort(AbortException ae) throws IOException {
 
     // Anywhere that throws an AbortException must first

Modified: lucene/java/trunk/src/java/org/apache/lucene/index/FieldsWriter.java
URL: http://svn.apache.org/viewvc/lucene/java/trunk/src/java/org/apache/lucene/index/FieldsWriter.java?rev=653878&r1=653877&r2=653878&view=diff
==============================================================================
--- lucene/java/trunk/src/java/org/apache/lucene/index/FieldsWriter.java (original)
+++ lucene/java/trunk/src/java/org/apache/lucene/index/FieldsWriter.java Tue May  6 11:41:10 2008
@@ -64,8 +64,16 @@
           success = true;
         } finally {
           if (!success) {
-            close();
-            d.deleteFile(fieldsName);
+            try {
+              close();
+            } catch (Throwable t) {
+              // Suppress so we keep throwing the original exception
+            }
+            try {
+              d.deleteFile(fieldsName);
+            } catch (Throwable t) {
+              // Suppress so we keep throwing the original exception
+            }
           }
         }
 
@@ -77,9 +85,20 @@
           success = true;
         } finally {
           if (!success) {
-            close();
-            d.deleteFile(fieldsName);
-            d.deleteFile(indexName);
+            try {
+              close();
+            } catch (IOException ioe) {
+            }
+            try {
+              d.deleteFile(fieldsName);
+            } catch (Throwable t) {
+              // Suppress so we keep throwing the original exception
+            }
+            try {
+              d.deleteFile(indexName);
+            } catch (Throwable t) {
+              // Suppress so we keep throwing the original exception
+            }
           }
         }
 

Modified: lucene/java/trunk/src/java/org/apache/lucene/index/IndexWriter.java
URL: http://svn.apache.org/viewvc/lucene/java/trunk/src/java/org/apache/lucene/index/IndexWriter.java?rev=653878&r1=653877&r2=653878&view=diff
==============================================================================
--- lucene/java/trunk/src/java/org/apache/lucene/index/IndexWriter.java (original)
+++ lucene/java/trunk/src/java/org/apache/lucene/index/IndexWriter.java Tue May  6 11:41:10 2008
@@ -306,6 +306,9 @@
   private SegmentInfos rollbackSegmentInfos;      // segmentInfos we will fallback to if the commit fails
   private HashMap rollbackSegments;
 
+  volatile SegmentInfos pendingCommit;            // set when a commit is pending (after prepareCommit() & before commit())
+  volatile long pendingCommitChangeCount;
+
   private SegmentInfos localRollbackSegmentInfos;      // segmentInfos we will fallback to if the commit fails
   private boolean localAutoCommit;                // saved autoCommit during local transaction
   private int localFlushedDocCount;               // saved docWriter.getFlushedDocCount during local transaction
@@ -364,12 +367,13 @@
       infoStream.println("IW " + messageID + " [" + Thread.currentThread().getName() + "]: " + message);
   }
 
-  private synchronized void setMessageID() {
+  private synchronized void setMessageID(PrintStream infoStream) {
     if (infoStream != null && messageID == -1) {
       synchronized(MESSAGE_ID_LOCK) {
         messageID = MESSAGE_ID++;
       }
     }
+    this.infoStream = infoStream;
   }
 
   /**
@@ -1082,9 +1086,8 @@
     this.closeDir = closeDir;
     directory = d;
     analyzer = a;
-    this.infoStream = defaultInfoStream;
+    setMessageID(defaultInfoStream);
     this.maxFieldLength = maxFieldLength;
-    setMessageID();
 
     if (create) {
       // Clear the write lock in case it's leftover:
@@ -1496,8 +1499,7 @@
    */
   public void setInfoStream(PrintStream infoStream) {
     ensureOpen();
-    this.infoStream = infoStream;
-    setMessageID();
+    setMessageID(infoStream);
     docWriter.setInfoStream(infoStream);
     deleter.setInfoStream(infoStream);
     if (infoStream != null)
@@ -1672,7 +1674,7 @@
       if (infoStream != null)
         message("now call final commit()");
 
-      commit(true, 0);
+      commit(0);
 
       if (infoStream != null)
         message("at close: " + segString());
@@ -2571,7 +2573,7 @@
     if (autoCommit) {
       boolean success = false;
       try {
-        commit(true, 0);
+        commit(0);
         success = true;
       } finally {
         if (!success) {
@@ -2588,24 +2590,40 @@
   }
 
   /**
+   * @deprecated Please use {@link #rollback} instead.
+   */
+  public void abort() throws IOException {
+    rollback();
+  }
+
+  /**
    * Close the <code>IndexWriter</code> without committing
    * any of the changes that have occurred since it was
    * opened. This removes any temporary files that had been
    * created, after which the state of the index will be the
    * same as it was when this writer was first opened.  This
    * can only be called when this IndexWriter was opened
-   * with <code>autoCommit=false</code>.
+   * with <code>autoCommit=false</code>.  This also clears a
+   * previous call to {@link #prepareCommit}.
    * @throws IllegalStateException if this is called when
    *  the writer was opened with <code>autoCommit=true</code>.
    * @throws IOException if there is a low-level IO error
    */
-  public void abort() throws IOException {
+  public void rollback() throws IOException {
     ensureOpen();
     if (autoCommit)
       throw new IllegalStateException("abort() can only be called when IndexWriter was opened with autoCommit=false");
 
     boolean doClose;
     synchronized(this) {
+
+      if (pendingCommit != null) {
+        pendingCommit.rollbackCommit(directory);
+        deleter.decRef(pendingCommit);
+        pendingCommit = null;
+        notifyAll();
+      }
+
       // Ensure that only one thread actually gets to do the closing:
       if (!closing) {
         doClose = true;
@@ -3113,10 +3131,54 @@
     flush(true, false, true);
   }
 
+  /** <p>Expert: prepare for commit.  This does the first
+   *  phase of 2-phase commit.  You can only call this when
+   *  autoCommit is false.  This method does all steps
+   *  necessary to commit changes since this writer was
+   *  opened: flushes pending added and deleted docs, syncs
+   *  the index files, writes most of next segments_N file.
+   *  After calling this you must call either {@link
+   *  #commit()} to finish the commit, or {@link
+   *  #rollback()} to revert the commit and undo all changes
+   *  done since the writer was opened.</p>
+   *
+   * You can also just call {@link #commit()} directly
+   * without prepareCommit first in which case that method
+   * will internally call prepareCommit.
+   */
+  public final void prepareCommit() throws CorruptIndexException, IOException {
+    prepareCommit(false);
+  }
+
+  private final void prepareCommit(boolean internal) throws CorruptIndexException, IOException {
+
+    if (hitOOM)
+      throw new IllegalStateException("this writer hit an OutOfMemoryError; cannot commit");
+
+    if (autoCommit && !internal)
+      throw new IllegalStateException("this method can only be used when autoCommit is false");
+
+    if (!autoCommit && pendingCommit != null)
+      throw new IllegalStateException("prepareCommit was already called with no corresponding call to commit");
+
+    message("prepareCommit: flush");
+
+    flush(true, true, true);
+
+    startCommit(0);
+  }
+
+  private void commit(long sizeInBytes) throws IOException {
+    startCommit(sizeInBytes);
+    finishCommit();
+  }
+
   /**
-   * <p>Commits all pending updates (added & deleted documents)
-   * to the index, and syncs all referenced index files,
-   * such that a reader will see the changes.  Note that
+   * <p>Commits all pending updates (added & deleted
+   * documents) to the index, and syncs all referenced index
+   * files, such that a reader will see the changes and the
+   * index updates will survive an OS or machine crash or
+   * power loss (though, see the note below).  Note that
    * this does not wait for any running background merges to
    * finish.  This may be a costly operation, so you should
    * test the cost in your application and do it only when
@@ -3135,12 +3197,38 @@
    * consistency on such devices.  </p>
    */
   public final void commit() throws CorruptIndexException, IOException {
-    commit(true);
+
+    message("commit: start");
+
+    if (autoCommit || pendingCommit == null) {
+      message("commit: now prepare");
+      prepareCommit(true);
+    } else
+      message("commit: already prepared");
+
+    finishCommit();
   }
 
-  private final void commit(boolean triggerMerges) throws CorruptIndexException, IOException {
-    flush(triggerMerges, true, true);
-    commit(true, 0);
+  private synchronized final void finishCommit() throws CorruptIndexException, IOException {
+
+    if (pendingCommit != null) {
+      try {
+        message("commit: pendingCommit != null");
+        pendingCommit.finishCommit(directory);
+        lastCommitChangeCount = pendingCommitChangeCount;
+        segmentInfos.updateGeneration(pendingCommit);
+        setRollbackSegmentInfos();
+        deleter.checkpoint(pendingCommit, true);
+      } finally {
+        deleter.decRef(pendingCommit);
+        pendingCommit = null;
+        notifyAll();
+      }
+
+    } else
+      message("commit: pendingCommit == null; skip");
+
+    message("commit: done");
   }
 
   /**
@@ -3176,8 +3264,7 @@
     // when flushing a segment; otherwise deletes may become
     // visible before their corresponding added document
     // from an updateDocument call
-    if (autoCommit)
-      flushDeletes = true;
+    flushDeletes |= autoCommit;
 
     // Returns true if docWriter is currently aborting, in
     // which case we skip flushing this segment
@@ -3935,7 +4022,7 @@
         synchronized(this) {
           size = merge.info.sizeInBytes();
         }
-        commit(false, size);
+        commit(size);
       }
       
       success = false;
@@ -3988,7 +4075,7 @@
       synchronized(this) {
         size = merge.info.sizeInBytes();
       }
-      commit(false, size);
+      commit(size);
     }
 
     return mergedDocCount;
@@ -4151,13 +4238,13 @@
   }
 
   /** Walk through all files referenced by the current
-   *  segmentInfos, minus flushes, and ask the Directory to
-   *  sync each file, if it wasn't already.  If that
-   *  succeeds, then we write a new segments_N file & sync
-   *  that. */
-  private void commit(boolean skipWait, long sizeInBytes) throws IOException {
+   *  segmentInfos and ask the Directory to sync each file,
+   *  if it wasn't already.  If that succeeds, then we
+   *  prepare a new segments_N file but do not fully commit
+   *  it. */
+  private void startCommit(long sizeInBytes) throws IOException {
 
-    assert testPoint("startCommit");
+    assert testPoint("startStartCommit");
 
     if (hitOOM)
       return;
@@ -4165,9 +4252,9 @@
     try {
 
       if (infoStream != null)
-        message("start commit() skipWait=" + skipWait + " sizeInBytes=" + sizeInBytes);
+        message("startCommit(): start sizeInBytes=" + sizeInBytes);
 
-      if (!skipWait)
+      if (sizeInBytes > 0)
         syncPause(sizeInBytes);
 
       SegmentInfos toSync = null;
@@ -4179,7 +4266,7 @@
 
         if (changeCount == lastCommitChangeCount) {
           if (infoStream != null)
-            message("  skip commit(): no changes pending");
+            message("  skip startCommit(): no changes pending");
           return;
         }
 
@@ -4189,15 +4276,17 @@
         // threads can be doing this at once, if say a large
         // merge and a small merge finish at the same time:
 
+        if (infoStream != null)
+          message("startCommit index=" + segString(segmentInfos) + " changeCount=" + changeCount);
+
         toSync = (SegmentInfos) segmentInfos.clone();
         deleter.incRef(toSync, false);
         myChangeCount = changeCount;
       }
 
-      if (infoStream != null)
-        message("commit index=" + segString(toSync));
+      assert testPoint("midStartCommit");
 
-      assert testPoint("midCommit");
+      boolean setPending = false;
 
       try {
 
@@ -4237,54 +4326,72 @@
             break;
         }
 
-        assert testPoint("midCommit2");
-      
+        assert testPoint("midStartCommit2");
+
         synchronized(this) {
           // If someone saved a newer version of segments file
           // since I first started syncing my version, I can
           // safely skip saving myself since I've been
           // superseded:
-          if (myChangeCount > lastCommitChangeCount) {
-          
+          if (myChangeCount > lastCommitChangeCount && (pendingCommit == null || myChangeCount > pendingCommitChangeCount)) {
+
+            // Wait now for any current pending commit to complete:
+            while(pendingCommit != null) {
+              message("wait for existing pendingCommit to finish...");
+              try {
+                wait();
+              } catch (InterruptedException ie) {
+                Thread.currentThread().interrupt();
+              }
+            }
+
             if (segmentInfos.getGeneration() > toSync.getGeneration())
               toSync.updateGeneration(segmentInfos);
 
             boolean success = false;
             try {
-              toSync.commit(directory);
+
+              // Exception here means nothing is prepared
+              // (this method unwinds everything it did on
+              // an exception)
+              try {
+                toSync.prepareCommit(directory);
+              } finally {
+                // Have our master segmentInfos record the
+                // generations we just prepared.  We do this
+                // on error or success so we don't
+                // double-write a segments_N file.
+                segmentInfos.updateGeneration(toSync);
+              }
+
+              assert pendingCommit == null;
+              setPending = true;
+              pendingCommit = toSync;
+              pendingCommitChangeCount = myChangeCount;
               success = true;
             } finally {
-              // Have our master segmentInfos record the
-              // generations we just sync'd
-              segmentInfos.updateGeneration(toSync);
               if (!success)
                 message("hit exception committing segments file");
             }
-
-            message("commit complete");
-
-            lastCommitChangeCount = myChangeCount;
-
-            deleter.checkpoint(toSync, true);
-            setRollbackSegmentInfos();
           } else
             message("sync superseded by newer infos");
         }
 
         message("done all syncs");
 
-        assert testPoint("midCommitSuccess");
+        assert testPoint("midStartCommitSuccess");
 
       } finally {
         synchronized(this) {
-          deleter.decRef(toSync);
+          if (!setPending)
+            deleter.decRef(toSync);
         }
       }
     } catch (OutOfMemoryError oom) {
       hitOOM = true;
       throw oom;
     }
-    assert testPoint("finishCommit");
+    assert testPoint("finishStartCommit");
   }
 
   /**
@@ -4377,11 +4484,11 @@
   // Used only by assert for testing.  Current points:
   //   startDoFlush
   //   startCommitMerge
-  //   startCommit
-  //   midCommit
-  //   midCommit2
-  //   midCommitSuccess
-  //   finishCommit
+  //   startStartCommit
+  //   midStartCommit
+  //   midStartCommit2
+  //   midStartCommitSuccess
+  //   finishStartCommit
   //   startCommitMergeDeletes
   //   startMergeInit
   //   startApplyDeletes

Modified: lucene/java/trunk/src/java/org/apache/lucene/index/SegmentInfos.java
URL: http://svn.apache.org/viewvc/lucene/java/trunk/src/java/org/apache/lucene/index/SegmentInfos.java?rev=653878&r1=653877&r2=653878&view=diff
==============================================================================
--- lucene/java/trunk/src/java/org/apache/lucene/index/SegmentInfos.java (original)
+++ lucene/java/trunk/src/java/org/apache/lucene/index/SegmentInfos.java Tue May  6 11:41:10 2008
@@ -274,6 +274,10 @@
     }.run();
   }
 
+  // Only non-null after prepareCommit has been called and
+  // before finishCommit is called
+  ChecksumIndexOutput pendingOutput;
+
   private final void write(Directory directory) throws IOException {
 
     String segmentFileName = getNextSegmentFileName();
@@ -298,53 +302,27 @@
       for (int i = 0; i < size(); i++) {
         info(i).write(output);
       }
-      final long checksum = output.getChecksum();
-      output.writeLong(checksum);
+      output.prepareCommit();
       success = true;
+      pendingOutput = output;
     } finally {
-      boolean success2 = false;
-      try {
-        if (!success) {
-          // We hit an exception above; try to close the file
-          // but suppress any exception:
-          try {
-            output.close();
-            success2 = true;
-          } catch (Throwable t) {
-            // Suppress so we keep throwing the original exception
-          }
-        } else {
+      if (!success) {
+        // We hit an exception above; try to close the file
+        // but suppress any exception:
+        try {
           output.close();
-          success2 = true;
+        } catch (Throwable t) {
+          // Suppress so we keep throwing the original exception
         }
-      } finally {
-        if (!success || !success2) {
-          try {
-            // Try not to leave a truncated segments_N file in
-            // the index:
-            directory.deleteFile(segmentFileName);
-          } catch (Throwable t) {
-            // Suppress so we keep throwing the original exception
-          }
+        try {
+          // Try not to leave a truncated segments_N file in
+          // the index:
+          directory.deleteFile(segmentFileName);
+        } catch (Throwable t) {
+          // Suppress so we keep throwing the original exception
         }
       }
     }
-
-    try {
-      IndexOutput genOutput = directory.createOutput(IndexFileNames.SEGMENTS_GEN);
-      try {
-        genOutput.writeInt(FORMAT_LOCKLESS);
-        genOutput.writeLong(generation);
-        genOutput.writeLong(generation);
-      } finally {
-        genOutput.close();
-      }
-    } catch (IOException e) {
-      // It's OK if we fail to write this file since it's
-      // used only as one of the retry fallbacks.
-    }
-    
-    lastGeneration = generation;
   }
 
   /**
@@ -355,7 +333,7 @@
   public Object clone() {
     SegmentInfos sis = (SegmentInfos) super.clone();
     for(int i=0;i<sis.size();i++) {
-      sis.setElementAt(((SegmentInfo) sis.elementAt(i)).clone(), i);
+      sis.setElementAt(sis.info(i).clone(), i);
     }
     return sis;
   }
@@ -739,45 +717,73 @@
 
   // Carry over generation numbers from another SegmentInfos
   void updateGeneration(SegmentInfos other) {
-    assert other.generation > generation;
     lastGeneration = other.lastGeneration;
     generation = other.generation;
     version = other.version;
   }
 
-  /** Writes & syncs to the Directory dir, taking care to
-   *  remove the segments file on exception */
-  public final void commit(Directory dir) throws IOException {
-    boolean success = false;
-    try {
-      write(dir);
-      success = true;
-    } finally {
-      if (!success) {
-        // Must carefully compute fileName from "generation"
-        // since lastGeneration isn't incremented:
+  public final void rollbackCommit(Directory dir) throws IOException {
+    if (pendingOutput != null) {
+      try {
+        pendingOutput.close();
+      } catch (Throwable t) {
+        // Suppress so we keep throwing the original exception
+        // in our caller
+      }
+
+      // Must carefully compute fileName from "generation"
+      // since lastGeneration isn't incremented:
+      try {
         final String segmentFileName = IndexFileNames.fileNameFromGeneration(IndexFileNames.SEGMENTS,
                                                                              "",
                                                                              generation);
-        try {
-          dir.deleteFile(segmentFileName);
-        } catch (Throwable t) {
-          // Suppress so we keep throwing the original exception
-        }
+        dir.deleteFile(segmentFileName);
+      } catch (Throwable t) {
+        // Suppress so we keep throwing the original exception
+        // in our caller
       }
+      pendingOutput = null;
+    }
+  }
+
+  /** Call this to start a commit.  This writes the new
+   *  segments file, but writes an invalid checksum at the
+   *  end, so that it is not visible to readers.  Once this
+   *  is called you must call {@link #finishCommit} to complete
+   *  the commit or {@link #rollbackCommit} to abort it. */
+  public final void prepareCommit(Directory dir) throws IOException {
+    if (pendingOutput != null)
+      throw new IllegalStateException("prepareCommit was already called");
+    write(dir);
+  }
+
+  public final void finishCommit(Directory dir) throws IOException {
+    if (pendingOutput == null)
+      throw new IllegalStateException("prepareCommit was not called");
+    boolean success = false;
+    try {
+      pendingOutput.finishCommit();
+      pendingOutput.close();
+      pendingOutput = null;
+      success = true;
+    } finally {
+      if (!success)
+        rollbackCommit(dir);
     }
 
     // NOTE: if we crash here, we have left a segments_N
     // file in the directory in a possibly corrupt state (if
     // some bytes made it to stable storage and others
-    // didn't).  But, the segments_N file now includes
-    // checksum at the end, which should catch this case.
-    // So when a reader tries to read it, it will throw a
+    // didn't).  But, the segments_N file includes checksum
+    // at the end, which should catch this case.  So when a
+    // reader tries to read it, it will throw a
     // CorruptIndexException, which should cause the retry
     // logic in SegmentInfos to kick in and load the last
     // good (previous) segments_N-1 file.
 
-    final String fileName = getCurrentSegmentFileName();
+    final String fileName = IndexFileNames.fileNameFromGeneration(IndexFileNames.SEGMENTS,
+                                                                  "",
+                                                                  generation);
     success = false;
     try {
       dir.sync(fileName);
@@ -791,5 +797,28 @@
         }
       }
     }
+
+    lastGeneration = generation;
+
+    try {
+      IndexOutput genOutput = dir.createOutput(IndexFileNames.SEGMENTS_GEN);
+      try {
+        genOutput.writeInt(FORMAT_LOCKLESS);
+        genOutput.writeLong(generation);
+        genOutput.writeLong(generation);
+      } finally {
+        genOutput.close();
+      }
+    } catch (Throwable t) {
+      // It's OK if we fail to write this file since it's
+      // used only as one of the retry fallbacks.
+    }
+  }
+
+  /** Writes & syncs to the Directory dir, taking care to
+   *  remove the segments file on exception */
+  public final void commit(Directory dir) throws IOException {
+    prepareCommit(dir);
+    finishCommit(dir);
   }
 }

Modified: lucene/java/trunk/src/java/org/apache/lucene/store/ChecksumIndexOutput.java
URL: http://svn.apache.org/viewvc/lucene/java/trunk/src/java/org/apache/lucene/store/ChecksumIndexOutput.java?rev=653878&r1=653877&r2=653878&view=diff
==============================================================================
--- lucene/java/trunk/src/java/org/apache/lucene/store/ChecksumIndexOutput.java (original)
+++ lucene/java/trunk/src/java/org/apache/lucene/store/ChecksumIndexOutput.java Tue May  6 11:41:10 2008
@@ -62,6 +62,30 @@
     throw new RuntimeException("not allowed");    
   }
 
+  /**
+   * Starts but does not complete the commit of this file (=
+   * writing of the final checksum at the end).  After this
+   * is called must call {@link #finishCommit} and the
+   * {@link #close} to complete the commit.
+   */
+  public void prepareCommit() throws IOException {
+    final long checksum = getChecksum();
+    // Intentionally write a mismatched checksum.  This is
+    // because we want to 1) test, as best we can, that we
+    // are able to write a long to the file, but 2) not
+    // actually "commit" the file yet.  This (prepare
+    // commit) is phase 1 of a two-phase commit.
+    final long pos = main.getFilePointer();
+    main.writeLong(checksum-1);
+    main.flush();
+    main.seek(pos);
+  }
+
+  /** See {@link #prepareCommit} */
+  public void finishCommit() throws IOException {
+    main.writeLong(getChecksum());
+  }
+
   public long length() throws IOException {
     return main.length();
   }

Modified: lucene/java/trunk/src/test/org/apache/lucene/index/TestAtomicUpdate.java
URL: http://svn.apache.org/viewvc/lucene/java/trunk/src/test/org/apache/lucene/index/TestAtomicUpdate.java?rev=653878&r1=653877&r2=653878&view=diff
==============================================================================
--- lucene/java/trunk/src/test/org/apache/lucene/index/TestAtomicUpdate.java (original)
+++ lucene/java/trunk/src/test/org/apache/lucene/index/TestAtomicUpdate.java Tue May  6 11:41:10 2008
@@ -68,6 +68,7 @@
           count++;
         }
       } catch (Throwable e) {
+        System.out.println(Thread.currentThread().getName() + ": exc");
         e.printStackTrace(System.out);
         failed = true;
       }
@@ -111,11 +112,7 @@
 
     public void doWork() throws Throwable {
       IndexReader r = IndexReader.open(directory);
-      try {
-        assertEquals(100, r.numDocs());
-      } catch (Throwable t) {
-        throw t;
-      }
+      assertEquals(100, r.numDocs());
       r.close();
     }
   }
@@ -141,6 +138,10 @@
     }
     writer.commit();
 
+    IndexReader r = IndexReader.open(directory);
+    assertEquals(100, r.numDocs());
+    r.close();
+
     IndexerThread indexerThread = new IndexerThread(writer, threads);
     threads[0] = indexerThread;
     indexerThread.start();

Modified: lucene/java/trunk/src/test/org/apache/lucene/index/TestIndexWriter.java
URL: http://svn.apache.org/viewvc/lucene/java/trunk/src/test/org/apache/lucene/index/TestIndexWriter.java?rev=653878&r1=653877&r2=653878&view=diff
==============================================================================
--- lucene/java/trunk/src/test/org/apache/lucene/index/TestIndexWriter.java (original)
+++ lucene/java/trunk/src/test/org/apache/lucene/index/TestIndexWriter.java Tue May  6 11:41:10 2008
@@ -3302,7 +3302,7 @@
       boolean isCommit = false;
       boolean isDelete = false;
       for (int i = 0; i < trace.length; i++) {
-        if ("org.apache.lucene.index.SegmentInfos".equals(trace[i].getClassName()) && "commit".equals(trace[i].getMethodName()))
+        if ("org.apache.lucene.index.SegmentInfos".equals(trace[i].getClassName()) && "prepareCommit".equals(trace[i].getMethodName()))
           isCommit = true;
         if ("org.apache.lucene.store.MockRAMDirectory".equals(trace[i].getClassName()) && "deleteFile".equals(trace[i].getMethodName()))
           isDelete = true;
@@ -3603,4 +3603,124 @@
     s.close();
     dir.close();
   }
+
+  // LUCENE-1274: test writer.prepareCommit()
+  public void testPrepareCommit() throws IOException {
+    Directory dir = new MockRAMDirectory();
+
+    IndexWriter writer = new IndexWriter(dir, false, new WhitespaceAnalyzer(), IndexWriter.MaxFieldLength.LIMITED);
+    writer.setMaxBufferedDocs(2);
+    writer.setMergeFactor(5);
+
+    for (int i = 0; i < 23; i++)
+      addDoc(writer);
+
+    IndexReader reader = IndexReader.open(dir);
+    assertEquals(0, reader.numDocs());
+
+    writer.prepareCommit();
+
+    IndexReader reader2 = IndexReader.open(dir);
+    assertEquals(0, reader2.numDocs());
+
+    writer.commit();
+
+    IndexReader reader3 = reader.reopen();
+    assertEquals(0, reader.numDocs());
+    assertEquals(0, reader2.numDocs());
+    assertEquals(23, reader3.numDocs());
+    reader.close();
+    reader2.close();
+
+    for (int i = 0; i < 17; i++)
+      addDoc(writer);
+
+    assertEquals(23, reader3.numDocs());
+    reader3.close();
+    reader = IndexReader.open(dir);
+    assertEquals(23, reader.numDocs());
+    reader.close();
+
+    writer.prepareCommit();
+
+    reader = IndexReader.open(dir);
+    assertEquals(23, reader.numDocs());
+    reader.close();
+
+    writer.commit();
+    reader = IndexReader.open(dir);
+    assertEquals(40, reader.numDocs());
+    reader.close();
+    writer.close();
+    dir.close();
+  }
+
+  // LUCENE-1274: test writer.prepareCommit()
+  public void testPrepareCommitRollback() throws IOException {
+    MockRAMDirectory dir = new MockRAMDirectory();
+    dir.setPreventDoubleWrite(false);
+
+    IndexWriter writer = new IndexWriter(dir, false, new WhitespaceAnalyzer(), IndexWriter.MaxFieldLength.LIMITED);
+
+    writer.setMaxBufferedDocs(2);
+    writer.setMergeFactor(5);
+
+    for (int i = 0; i < 23; i++)
+      addDoc(writer);
+
+    IndexReader reader = IndexReader.open(dir);
+    assertEquals(0, reader.numDocs());
+
+    writer.prepareCommit();
+
+    IndexReader reader2 = IndexReader.open(dir);
+    assertEquals(0, reader2.numDocs());
+
+    writer.rollback();
+
+    IndexReader reader3 = reader.reopen();
+    assertEquals(0, reader.numDocs());
+    assertEquals(0, reader2.numDocs());
+    assertEquals(0, reader3.numDocs());
+    reader.close();
+    reader2.close();
+
+    writer = new IndexWriter(dir, false, new WhitespaceAnalyzer(), IndexWriter.MaxFieldLength.LIMITED);
+    for (int i = 0; i < 17; i++)
+      addDoc(writer);
+
+    assertEquals(0, reader3.numDocs());
+    reader3.close();
+    reader = IndexReader.open(dir);
+    assertEquals(0, reader.numDocs());
+    reader.close();
+
+    writer.prepareCommit();
+
+    reader = IndexReader.open(dir);
+    assertEquals(0, reader.numDocs());
+    reader.close();
+
+    writer.commit();
+    reader = IndexReader.open(dir);
+    assertEquals(17, reader.numDocs());
+    reader.close();
+    writer.close();
+    dir.close();
+  }
+
+  // LUCENE-1274
+  public void testPrepareCommitNoChanges() throws IOException {
+    MockRAMDirectory dir = new MockRAMDirectory();
+
+    IndexWriter writer = new IndexWriter(dir, false, new WhitespaceAnalyzer(), IndexWriter.MaxFieldLength.LIMITED);
+    writer.prepareCommit();
+    writer.commit();
+    writer.close();
+
+    IndexReader reader = IndexReader.open(dir);
+    assertEquals(0, reader.numDocs());
+    reader.close();
+    dir.close();
+  }
 }

Modified: lucene/java/trunk/src/test/org/apache/lucene/index/TestStressIndexing.java
URL: http://svn.apache.org/viewvc/lucene/java/trunk/src/test/org/apache/lucene/index/TestStressIndexing.java?rev=653878&r1=653877&r2=653878&view=diff
==============================================================================
--- lucene/java/trunk/src/test/org/apache/lucene/index/TestStressIndexing.java (original)
+++ lucene/java/trunk/src/test/org/apache/lucene/index/TestStressIndexing.java Tue May  6 11:41:10 2008
@@ -53,6 +53,7 @@
           count++;
         }
       } catch (Throwable e) {
+        System.out.println(Thread.currentThread() + ": exc");
         e.printStackTrace(System.out);
         failed = true;
       }

Added: lucene/java/trunk/src/test/org/apache/lucene/index/TestTransactions.java
URL: http://svn.apache.org/viewvc/lucene/java/trunk/src/test/org/apache/lucene/index/TestTransactions.java?rev=653878&view=auto
==============================================================================
--- lucene/java/trunk/src/test/org/apache/lucene/index/TestTransactions.java (added)
+++ lucene/java/trunk/src/test/org/apache/lucene/index/TestTransactions.java Tue May  6 11:41:10 2008
@@ -0,0 +1,217 @@
+package org.apache.lucene.index;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.util.Random;
+import org.apache.lucene.store.*;
+import org.apache.lucene.util.*;
+import org.apache.lucene.analysis.*;
+import org.apache.lucene.document.*;
+
+public class TestTransactions extends LuceneTestCase
+{
+  private static final Random RANDOM = new Random();
+  private static volatile boolean doFail;
+
+  private class RandomFailure extends MockRAMDirectory.Failure {
+    public void eval(MockRAMDirectory dir) throws IOException {
+      if (TestTransactions.doFail && RANDOM.nextInt() % 10 <= 3)
+        throw new IOException("now failing randomly but on purpose");
+    }
+  }
+
+  private static abstract class TimedThread extends Thread {
+    boolean failed;
+    private static int RUN_TIME_SEC = 6;
+    private TimedThread[] allThreads;
+
+    abstract public void doWork() throws Throwable;
+
+    TimedThread(TimedThread[] threads) {
+      this.allThreads = threads;
+    }
+
+    public void run() {
+      final long stopTime = System.currentTimeMillis() + 1000*RUN_TIME_SEC;
+
+      try {
+        while(System.currentTimeMillis() < stopTime && !anyErrors())
+          doWork();
+      } catch (Throwable e) {
+        System.out.println(Thread.currentThread() + ": exc");
+        e.printStackTrace(System.out);
+        failed = true;
+      }
+    }
+
+    private boolean anyErrors() {
+      for(int i=0;i<allThreads.length;i++)
+        if (allThreads[i] != null && allThreads[i].failed)
+          return true;
+      return false;
+    }
+  }
+
+  private static class IndexerThread extends TimedThread {
+    Directory dir1;
+    Directory dir2;
+    Object lock;
+    int nextID;
+
+    public IndexerThread(Object lock, Directory dir1, Directory dir2, TimedThread[] threads) {
+      super(threads);
+      this.lock = lock;
+      this.dir1 = dir1;
+      this.dir2 = dir2;
+    }
+
+    public void doWork() throws Throwable {
+
+      IndexWriter writer1 = new IndexWriter(dir1, false, new WhitespaceAnalyzer(), IndexWriter.MaxFieldLength.LIMITED);
+      writer1.setMaxBufferedDocs(3);
+      writer1.setMergeFactor(2);
+      ((ConcurrentMergeScheduler) writer1.getMergeScheduler()).setSuppressExceptions();
+
+      IndexWriter writer2 = new IndexWriter(dir2, false, new WhitespaceAnalyzer(), IndexWriter.MaxFieldLength.LIMITED);
+      // Intentionally use different params so flush/merge
+      // happen @ different times
+      writer2.setMaxBufferedDocs(2);
+      writer2.setMergeFactor(3);
+      ((ConcurrentMergeScheduler) writer2.getMergeScheduler()).setSuppressExceptions();
+
+      update(writer1);
+      update(writer2);
+
+      TestTransactions.doFail = true;
+      try {
+        synchronized(lock) {
+          try {
+            writer1.prepareCommit();
+          } catch (Throwable t) {
+            writer1.rollback();
+            writer2.rollback();
+            return;
+          }
+          try {
+            writer2.prepareCommit();
+          } catch (Throwable t) { 	
+            writer1.rollback();
+            writer2.rollback();
+            return;
+          }
+
+          writer1.commit();
+          writer2.commit();
+        }
+      } finally {
+        TestTransactions.doFail = false;
+      }  
+
+      writer1.close();
+      writer2.close();
+    }
+
+    public void update(IndexWriter writer) throws IOException {
+      // Add 10 docs:
+      for(int j=0; j<10; j++) {
+        Document d = new Document();
+        int n = RANDOM.nextInt();
+        d.add(new Field("id", Integer.toString(nextID++), Field.Store.YES, Field.Index.UN_TOKENIZED));
+        d.add(new Field("contents", English.intToEnglish(n), Field.Store.NO, Field.Index.TOKENIZED));
+        writer.addDocument(d);
+      }
+
+      // Delete 5 docs:
+      int deleteID = nextID-1;
+      for(int j=0; j<5; j++) {
+        writer.deleteDocuments(new Term("id", ""+deleteID));
+        deleteID -= 2;
+      }
+    }
+  }
+
+  private static class SearcherThread extends TimedThread {
+    Directory dir1;
+    Directory dir2;
+    Object lock;
+
+    public SearcherThread(Object lock, Directory dir1, Directory dir2, TimedThread[] threads) {
+      super(threads);
+      this.lock = lock;
+      this.dir1 = dir1;
+      this.dir2 = dir2;
+    }
+
+    public void doWork() throws Throwable {
+      IndexReader r1, r2;
+      synchronized(lock) {
+        r1 = IndexReader.open(dir1);
+        r2 = IndexReader.open(dir2);
+      }
+      if (r1.numDocs() != r2.numDocs())
+        throw new RuntimeException("doc counts differ: r1=" + r1.numDocs() + " r2=" + r2.numDocs());
+      r1.close();
+      r2.close();
+    }
+  }
+
+  public void initIndex(Directory dir) throws Throwable {
+    IndexWriter writer = new IndexWriter(dir, false, new WhitespaceAnalyzer(), IndexWriter.MaxFieldLength.LIMITED);
+    for(int j=0; j<7; j++) {
+      Document d = new Document();
+      int n = RANDOM.nextInt();
+      d.add(new Field("contents", English.intToEnglish(n), Field.Store.NO, Field.Index.TOKENIZED));
+      writer.addDocument(d);
+    }
+    writer.close();
+  }
+
+  public void testTransactions() throws Throwable {
+    MockRAMDirectory dir1 = new MockRAMDirectory();
+    MockRAMDirectory dir2 = new MockRAMDirectory();
+    dir1.setPreventDoubleWrite(false);
+    dir2.setPreventDoubleWrite(false);
+    dir1.failOn(new RandomFailure());
+    dir2.failOn(new RandomFailure());
+
+    initIndex(dir1);
+    initIndex(dir2);
+
+    TimedThread[] threads = new TimedThread[3];
+    int numThread = 0;
+
+    IndexerThread indexerThread = new IndexerThread(this, dir1, dir2, threads);
+    threads[numThread++] = indexerThread;
+    indexerThread.start();
+
+    SearcherThread searcherThread1 = new SearcherThread(this, dir1, dir2, threads);
+    threads[numThread++] = searcherThread1;
+    searcherThread1.start();
+
+    SearcherThread searcherThread2 = new SearcherThread(this, dir1, dir2, threads);
+    threads[numThread++] = searcherThread2;
+    searcherThread2.start();
+
+    for(int i=0;i<numThread;i++)
+      threads[i].join();
+
+    for(int i=0;i<numThread;i++)
+      assertTrue(!((TimedThread) threads[i]).failed);
+  }
+}

Modified: lucene/java/trunk/src/test/org/apache/lucene/store/MockRAMDirectory.java
URL: http://svn.apache.org/viewvc/lucene/java/trunk/src/test/org/apache/lucene/store/MockRAMDirectory.java?rev=653878&r1=653877&r2=653878&view=diff
==============================================================================
--- lucene/java/trunk/src/test/org/apache/lucene/store/MockRAMDirectory.java (original)
+++ lucene/java/trunk/src/test/org/apache/lucene/store/MockRAMDirectory.java Tue May  6 11:41:10 2008
@@ -209,7 +209,7 @@
       throw new IOException("cannot createOutput after crash");
     init();
     synchronized(openFiles) {
-      if (preventDoubleWrite && createdFiles.contains(name))
+      if (preventDoubleWrite && createdFiles.contains(name) && !name.equals("segments.gen"))
         throw new IOException("file \"" + name + "\" was already written to");
       if (noDeleteOpenFile && openFiles.containsKey(name))
        throw new IOException("MockRAMDirectory: file \"" + name + "\" is still open: cannot overwrite");